Tutorials
Step-by-step guides to get you started with FinWorld
Tutorial 1: Downloading Financial Data
Learn how to download financial data using FinWorld's downloader system. This tutorial covers the complete workflow from configuration to execution.
Step 1: Define Downloader and Processor
FinWorld provides several built-in downloaders for different data sources:
- FMPDownloader - Financial Modeling Prep API for US market data
- AlpacaDownloader - Alpaca API for real-time US market data
- AkShareDownloader - AkShare for Chinese market data
- TuShareDownloader - TuShare for Chinese financial data
Step 2: Create Configuration File
Create a Python configuration file (e.g., my_download_config.py):
# Basic configuration for downloading HS300 data
workdir = "workdir"
assets_name = "hs300"
source = "fmp" # or "alpaca", "akshare", "tushare"
data_type = "price"
level = "1day" # or "1min" for minute-level data
tag = f"{assets_name}_{source}_{data_type}_{level}"
exp_path = f"{workdir}/{tag}"
log_path = "finworld.log"
# Downloader configuration
downloader = dict(
type="PriceDownloader",
source=source,
assets_path=f"configs/_asset_list_/{assets_name}.json",
start_date="2020-01-01",
end_date="2024-12-31",
level=level,
format="%Y-%m-%d",
max_concurrent=50,
)
Step 3: Use Built-in Download Script
FinWorld provides a built-in download script at scripts/download/download.py:
import argparse
import os
import sys
from pathlib import Path
from mmengine import DictAction
import asyncio
root = str(Path(__file__).resolve().parents[2])
sys.path.append(root)
from finworld.log import logger
from finworld.config import config
from finworld.registry import DOWNLOADER
def parse_args():
parser = argparse.ArgumentParser(description='main')
parser.add_argument("--config", default=os.path.join(root, "configs", "download", "full_symbol_info.py"), help="config file path")
parser.add_argument('--cfg-options', nargs='+', action=DictAction, help='override some settings in the used config')
args = parser.parse_args()
return args
async def main():
# Parse command line arguments
args = parse_args()
# Initialize the configuration
config.init_config(args.config, args)
# Initialize the logger
logger.init_logger(config=config)
logger.info(f"| Logger initialized at: {config.log_path}")
logger.info(f"| Config:\n{config.pretty_text}")
downloader = DOWNLOADER.build(config.downloader)
try:
await downloader.run()
except KeyboardInterrupt:
sys.exit()
if __name__ == '__main__':
asyncio.run(main())
Step 4: Run Download Script
Execute the download using the built-in script:
# Download HS300 data
python scripts/download/download.py --config configs/download/hs300/hs300_fmp_price_1day.py
# Download SP500 data
python scripts/download/download.py --config configs/download/sp500/sp500_fmp_price_1day.py
# Download DJ30 data
python scripts/download/download.py --config configs/download/dj30/dj30_fmp_price_1day.py
💡 Pro Tips
- Use
max_concurrentto control download speed and avoid rate limits - Check
configs/_asset_list_/for predefined asset collections - For large datasets, consider downloading in chunks by date ranges
- Use the provided shell scripts in
examples/download.shfor batch downloads
Tutorial 2: Processing Financial Data
Learn how to process downloaded financial data using FinWorld's processor system. This tutorial shows how to clean, transform, and prepare data for machine learning.
Step 1: Understand Data Processing
FinWorld's processor system handles:
- Data Cleaning: Remove missing values, handle outliers
- Feature Engineering: Calculate technical indicators, returns, volatility
- Data Transformation: Normalization, standardization, scaling
- Data Validation: Check data quality and consistency
Step 2: Create Processing Configuration
Create a processing configuration file (e.g., my_process_config.py):
# Processing configuration for HS300 data
workdir = "workdir"
assets_name = "hs300"
source = "fmp"
data_type = "price"
level = "1day"
tag = f"{assets_name}_{source}_{data_type}_{level}"
exp_path = f"{workdir}/{tag}"
log_path = "finworld.log"
# Processor configuration
processor = dict(
type="PriceProcessor",
source=source,
data_path=exp_path,
output_path=f"{exp_path}_processed",
features=["open", "high", "low", "close", "volume"],
technical_indicators=True,
normalize=True,
fill_missing=True,
)
Step 3: Use Built-in Processing Script
FinWorld provides a built-in processing script at scripts/process/process.py:
import argparse
import os
import sys
from pathlib import Path
from mmengine import DictAction
import asyncio
root = str(Path(__file__).resolve().parents[2])
sys.path.append(root)
from finworld.log import logger
from finworld.config import config
from finworld.registry import PROCESSOR
def parse_args():
parser = argparse.ArgumentParser(description='main')
parser.add_argument("--config", default=os.path.join(root, "configs", "process", "dj30.py"), help="config file path")
parser.add_argument('--cfg-options', nargs='+', action=DictAction, help='override some settings in the used config')
args = parser.parse_args()
return args
async def main():
# Parse command line arguments
args = parse_args()
# Initialize the configuration
config.init_config(args.config, args)
# Initialize the logger
logger.init_logger(config=config)
logger.info(f"| Logger initialized at: {config.log_path}")
logger.info(f"| Config:\n{config.pretty_text}")
processor = PROCESSOR.build(config.processor)
try:
await processor.run()
except KeyboardInterrupt:
sys.exit()
if __name__ == '__main__':
asyncio.run(main())
Step 4: Run Processing Script
Execute the processing using the built-in script:
# Process HS300 data
python scripts/process/process.py --config configs/process/hs300.py
# Process SP500 data
python scripts/process/process.py --config configs/process/sp500.py
# Process DJ30 data
python scripts/process/process.py --config configs/process/dj30.py
💡 Pro Tips
- Use the provided shell scripts in
examples/process.shfor batch processing - Check processed data quality before training models
- Adjust technical indicators based on your trading strategy
- Consider data leakage when engineering features
Tutorial 3: RL Trading with PPO
Learn how to implement reinforcement learning trading using PPO algorithm. This tutorial covers the complete workflow from understanding FinWorld's core modules to training.
Step 1: Understanding FinWorld's RL Architecture
FinWorld's RL trading system consists of several key components:
- Dataset: Loads and processes financial data
- Environment: Simulates trading environment with rewards
- Actor-Critic: Neural networks for policy and value estimation
- PPO Agent: Combines actor and critic for RL training
- Trainer: Manages the training loop and policy updates
Step 1.5: FinWorld's Registry System
FinWorld uses a registry system to manage all components. Here's how it works:
# finworld/registry.py
from mmengine.registry import Registry
# Define registries for different component types
DATASET = Registry('data', locations=['finworld.data'])
ENVIRONMENT = Registry('environment', locations=['finworld.environment'])
EMBED = Registry('embed', locations=['finworld.models'])
ENCODER = Registry('encoder', locations=['finworld.models'])
DECODER = Registry('decoder', locations=['finworld.models'])
AGENT = Registry('agent', locations=['finworld.agent'])
TRAINER = Registry('trainer', locations=['finworld.trainer'])
TASK = Registry('task', locations=['finworld.task'])
# Components are registered using decorators
@AGENT.register_module(force=True)
class PPO(nn.Module):
# Implementation here
pass
@TRAINER.register_module(force=True)
class PPOTradingTrainer():
# Implementation here
pass
# Components are built using the registry
agent = AGENT.build(agent_config)
trainer = TRAINER.build(trainer_config)
Step 2: Dataset - Loading Financial Data
FinWorld's SingleAssetDataset handles financial data loading and preprocessing:
from finworld.data.single_asset_dataset import SingleAssetDataset
from finworld.registry import DATASET
# Create dataset instance
dataset = SingleAssetDataset(
symbol="AAPL",
data_path="workdir/AAPL_fmp_price_1day_processed",
history_timestamps=64,
future_timestamps=32,
start_timestamp="2015-01-01",
end_timestamp="2024-12-31",
level="1day",
if_norm=True,
if_use_temporal=True,
if_use_future=True
)
# Key methods:
# - __getitem__(idx): Returns data for a specific time window
# - _load_asset_info(): Loads asset metadata
# - _init_asset_data(): Processes and normalizes data
# - _cal_time(): Calculates temporal features
Step 3: Environment - Trading Simulation
FinWorld's EnvironmentGeneralTrading simulates the trading environment:
from finworld.environment.environment_general_trading import EnvironmentGeneralTrading
from finworld.registry import ENVIRONMENT
# Create trading environment
env = EnvironmentGeneralTrading(
dataset=dataset,
initial_amount=100000, # Starting capital
transaction_cost_pct=0.001, # 0.1% transaction cost
history_timestamps=64,
step_timestamps=1,
future_timestamps=32,
start_timestamp="2015-01-01",
end_timestamp="2024-12-31",
gamma=0.99 # Discount factor
)
# Key methods:
# - reset(): Initialize environment state
# - step(action): Execute action and return next state, reward, done, info
# - _init_features(): Load and process market data
# - _cal_reward(): Calculate trading rewards
Step 4: Actor Network - Policy Learning
FinWorld's Actor network learns the trading policy. Here's the actual implementation:
import torch
import torch.nn as nn
from tensordict import TensorDict
from diffusers.utils.accelerate_utils import apply_forward_hook
from einops import rearrange
from finworld.registry import EMBED, ENCODER
from finworld.models.base import Model
from finworld.models.modules.attention_pool import AttentionPool1D
from finworld.models.modules.mean_pool import MeanPool1D
from finworld.task import TaskType
from finworld.models.embed.position import SinCosPosition1DEmbed, SinCosPosition2DEmbed
class Actor(Model):
def __init__(self,
task_type: str,
embed_config: dict,
encoder_config: dict,
action_dim: int,
output_dim: int,
pool_type: str = 'avg', # 'avg' or 'attn'
**kwargs):
super(Actor, self).__init__()
self.task_type = TaskType.from_string(task_type)
self.action_dim = action_dim
self.output_dim = output_dim
# Build embedding layer using registry
self.embed = EMBED.build(embed_config)
# Position embedding based on task type
if self.task_type == TaskType.TRADING:
self.pos_embed = SinCosPosition1DEmbed(
num_positions=embed_config["history_timestamps"] // embed_config["patch_timestamps"],
embed_dim=embed_config["output_dim"],
num_prefix=0
)
elif self.task_type == TaskType.PORTFOLIO:
self.pos_embed = SinCosPosition2DEmbed(
num_time=embed_config["history_timestamps"] // embed_config["patch_timestamps"],
num_space= embed_config["num_assets"],
embed_dim=embed_config["output_dim"],
num_prefix=0
)
# Build encoder using registry
self.encoder = ENCODER.build(encoder_config)
# Pooling layer
self.pool_type = pool_type
if self.pool_type == 'avg':
self.pool = MeanPool1D(dim=1, keepdim=False)
elif self.pool_type == 'attn':
self.pool = AttentionPool1D(
in_features=encoder_config['output_dim'],
out_features=output_dim,
embed_dim=encoder_config['latent_dim'],
num_heads=encoder_config['num_heads'],
)
# Final decoder layer
self.decoder = nn.Linear(encoder_config['output_dim'], output_dim)
self.initialize_weights()
@apply_forward_hook
def encode(self, x: TensorDict):
"""Encode input features through embedding and encoder"""
x = self.embed(x)
# Add positional embedding
pos = self.pos_embed(x)
x = x + pos
# Process through encoder
x, _, _ = self.encoder(x)
return x
@apply_forward_hook
def decode(self, x: TensorDict):
"""Decode encoded features to action logits"""
x = self.pool(x)
x = self.decoder(x)
return x
def forward(self, x: TensorDict):
"""Complete forward pass"""
batch_size = x.batch_size
# Handle multi-environment case
if len(batch_size) == 2:
x = TensorDict(
{
key: rearrange(value, 'b e ... -> (b e) ...') for key, value in x.items()
},
batch_size=batch_size[0] * batch_size[1]
)
x = self.encode(x)
x = self.decode(x)
# Reshape back for multi-environment case
if len(batch_size) == 2:
x = rearrange(x, '(b e) ... -> b e ...', b=batch_size[0])
return x
Step 5: Critic Network - Value Estimation
FinWorld's Critic network estimates state values. Here's the actual implementation:
import torch
import torch.nn as nn
from tensordict import TensorDict
from diffusers.utils.accelerate_utils import apply_forward_hook
from einops import rearrange
from finworld.registry import EMBED, ENCODER
from finworld.models.base import Model
from finworld.models.modules.attention_pool import AttentionPool1D
from finworld.models.modules.mean_pool import MeanPool1D
from finworld.models.embed import SparseEmbed, DenseLinearEmbed
from finworld.task import TaskType
from finworld.models.embed.position import SinCosPosition2DEmbed, SinCosPosition1DEmbed
class Critic(Model):
def __init__(self,
task_type: str,
embed_config: dict,
encoder_config: dict,
action_dim: int,
output_dim: int,
pool_type: str = 'avg', # 'avg' or 'attn'
**kwargs):
super(Critic, self).__init__()
self.task_type = TaskType.from_string(task_type)
# Build embedding layer using registry
self.embed = EMBED.build(embed_config)
# Position embedding based on task type
if self.task_type == TaskType.TRADING:
self.pos_embed = SinCosPosition1DEmbed(
num_positions=embed_config["history_timestamps"] // embed_config["patch_timestamps"],
embed_dim=embed_config["output_dim"],
num_prefix=0
)
elif self.task_type == TaskType.PORTFOLIO:
self.pos_embed = SinCosPosition2DEmbed(
num_time=embed_config["history_timestamps"] // embed_config["patch_timestamps"],
num_space= embed_config["num_assets"],
embed_dim=embed_config["output_dim"],
)
# Build encoder using registry
self.encoder = ENCODER.build(encoder_config)
# Pooling layer
if pool_type == 'avg':
self.pool = MeanPool1D(dim=1, keepdim=False)
elif pool_type == 'attn':
self.pool = AttentionPool1D(
in_features=encoder_config['output_dim'],
out_features=encoder_config['output_dim'],
embed_dim=encoder_config['latent_dim'],
num_heads=encoder_config['num_heads'],
)
# Final decoder layer
self.decoder = nn.Linear(encoder_config['output_dim'], output_dim)
# Action encoder for action-value functions
if self.task_type == TaskType.TRADING:
self.action_encoder = SparseEmbed(
num_embeddings=action_dim,
output_dim=encoder_config['output_dim'],
)
elif self.task_type == TaskType.PORTFOLIO:
self.action_encoder = DenseLinearEmbed(
input_dim=action_dim,
output_dim=encoder_config['output_dim'],
)
# Attention fusion for action-value
self.attn_fusion = nn.MultiheadAttention(
embed_dim=encoder_config['output_dim'],
num_heads=encoder_config['num_heads'],
batch_first=True,
)
self.initialize_weights()
@apply_forward_hook
def encode(self, x: TensorDict):
"""Encode input features through embedding and encoder"""
x = self.embed(x)
# Add positional embedding
pos = self.pos_embed(x)
x = x + pos
# Process through encoder
x, _, _ = self.encoder(x)
return x
@apply_forward_hook
def decode(self, x: torch.Tensor, a: torch.Tensor = None):
"""Decode encoded features to value estimate"""
x = self.pool(x)
# Optionally fuse with action information
if a is not None:
a = self.action_encoder(a)
x = self.attn_fusion(x, a, a)[0]
x = self.decoder(x)
x = x.squeeze(-1)
return x
def forward(self, x: TensorDict, a: torch.Tensor = None):
"""Complete forward pass"""
batch_size = x.batch_size
# Handle multi-environment case
if len(batch_size) == 2:
x = TensorDict(
{
key: rearrange(value, 'b e ... -> (b e) ...') for key, value in x.items()
},
batch_size=batch_size[0] * batch_size[1]
)
if a is not None:
a = rearrange(a, 'b e ... -> (b e) ...')
x = self.encode(x)
x = self.decode(x, a)
# Reshape back for multi-environment case
if len(batch_size) == 2:
x = rearrange(x, '(b e) ... -> b e ...', b=batch_size[0])
return x
Step 6: PPO Agent - Combining Actor and Critic
FinWorld's PPO agent combines actor and critic for RL training. Here's the actual implementation:
import torch
import torch.nn as nn
from functools import partial
from einops import rearrange
from torch.distributions.categorical import Categorical
from typing import Tuple
from tensordict import TensorDict
from torch import Tensor
import torch.nn.functional as F
from torch.distributions import Dirichlet
from finworld.registry import AGENT
from finworld.models.rl.actor import Actor
from finworld.models.rl.critic import Critic
from finworld.task import TaskType
@AGENT.register_module(force=True)
class PPO(nn.Module):
def __init__(self,
*args,
task_type: str,
embed_config: dict,
encoder_config: dict,
action_dim: int,
actor_output_dim: int,
critic_output_dim: int,
**kwargs
):
super(PPO, self).__init__()
self.task_type = TaskType.from_string(task_type)
self.action_dim = action_dim
self.actor_output_dim = actor_output_dim
self.critic_output_dim = critic_output_dim
# Create actor network
self.actor = Actor(
task_type=task_type,
embed_config=embed_config,
encoder_config=encoder_config,
action_dim=action_dim,
output_dim=actor_output_dim
)
# Create critic network
self.critic = Critic(
task_type=task_type,
embed_config=embed_config,
encoder_config=encoder_config,
action_dim=action_dim,
output_dim=critic_output_dim
)
def get_value(self, x: TensorDict, a: Tensor = None):
"""Get state value estimate"""
return self.critic(x, a)
def get_action(self, x: TensorDict):
"""Sample action from policy"""
batch_size = x.batch_size
# Get action logits from actor
logits = self.actor(x)
# Create categorical distribution
dist = Categorical(logits=logits)
# Sample action
action = dist.sample()
# Get log probability
log_prob = dist.log_prob(action)
return action, log_prob
def get_action_and_value(self, x: TensorDict):
"""Get both action and value in one forward pass"""
batch_size = x.batch_size
# Get action logits from actor
logits = self.actor(x)
# Create categorical distribution
dist = Categorical(logits=logits)
# Sample action
action = dist.sample()
# Get log probability
log_prob = dist.log_prob(action)
# Get value estimate
value = self.critic(x, action)
return action, log_prob, value
def evaluate_actions(self, x: TensorDict, action: Tensor):
"""Evaluate actions for policy updates"""
batch_size = x.batch_size
# Get action logits from actor
logits = self.actor(x)
# Create categorical distribution
dist = Categorical(logits=logits)
# Get log probability of given actions
log_prob = dist.log_prob(action)
# Get entropy
entropy = dist.entropy()
# Get value estimate
value = self.critic(x, action)
return log_prob, value, entropy
def forward(self, *input, **kwargs):
"""Forward pass - not used in PPO"""
pass
Step 7: PPO Trainer - Training Loop
FinWorld's PPOTradingTrainer manages the training process. Here's the actual implementation:
import torch
from torch import nn
import gym
import torch.optim as optim
import time
import numpy as np
import os
from copy import deepcopy
from tensordict import TensorDict
from einops import rearrange
from finworld.registry import TRAINER
from finworld.registry import ENVIRONMENT
from finworld.environment import make_env
from finworld.utils import build_storage, TradingRecords
from finworld.log import logger
@TRAINER.register_module(force=True)
class PPOTradingTrainer():
def __init__(self,
*args,
config = None,
dataset = None,
agent = None,
metrics = None,
device = None,
dtype = None,
**kwargs):
# Build training environments
train_environment_config = deepcopy(config.train_environment)
train_environment_config.update({"dataset": dataset})
self.train_environment = ENVIRONMENT.build(train_environment_config)
# Build validation environments
valid_environment_config = deepcopy(config.valid_environment)
valid_environment_config.update({"dataset": dataset})
self.valid_environment = ENVIRONMENT.build(valid_environment_config)
# Build test environments
test_environment_config = deepcopy(config.test_environment)
test_environment_config.update({"dataset": dataset})
self.test_environment = ENVIRONMENT.build(test_environment_config)
# Create vectorized environments
env_name = self.train_environment.__class__.__name__
self.train_environments = gym.vector.AsyncVectorEnv([
make_env(env_name, env_params=dict(env=deepcopy(self.train_environment),
transition_shape=config.transition_shape,
seed=config.seed + i))
for i in range(config.num_envs)
])
# Initialize agent and optimizers
self.agent = agent
self.policy_optimizer = optim.Adam(agent.actor.parameters(), lr=config.policy_learning_rate)
self.value_optimizer = optim.Adam(agent.critic.parameters(), lr=config.value_learning_rate)
# Training configuration
self.config = config
self.device = device
self.dtype = dtype
self.metrics = metrics
# Storage for experience
self.storage = build_storage(config.transition_shape, config.batch_size)
def explore_environment(self, init_state, init_info, reset=False):
"""Collect experience from environment"""
if reset:
state, info = self.train_environments.reset()
else:
state, info = init_state, init_info
# Collect experience for num_steps
for step in range(self.config.num_steps):
# Get action from agent
with torch.no_grad():
action, log_prob, value = self.agent.get_action_and_value(state)
# Execute action in environment
next_state, reward, done, truncated, info = self.train_environments.step(action)
# Store experience
self.storage.store({
'features': state['features'],
'times': state['times'],
'policy_trading_cashes': state.get('cashes', torch.zeros_like(reward)),
'policy_trading_positions': state.get('positions', torch.zeros_like(reward)),
'policy_trading_actions': action,
'policy_trading_rets': reward,
'training_values': value,
'training_dones': done,
'training_actions': action,
'training_logprobs': log_prob,
'training_rewards': reward,
'training_advantages': torch.zeros_like(reward), # Will be computed later
'training_returns': torch.zeros_like(reward) # Will be computed later
})
state = next_state
def update_policy(self, flat_storage, b_inds, info):
"""Update actor and critic networks using PPO"""
# Get batch data
batch_data = {key: flat_storage[key][b_inds] for key in flat_storage.keys()}
# Compute advantages and returns
advantages = self.compute_advantages(batch_data)
returns = advantages + batch_data['training_values']
# Normalize advantages
if self.config.norm_adv:
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# Update for multiple epochs
for epoch in range(self.config.update_epochs):
# Get new action probabilities
new_log_prob, new_value, entropy = self.agent.evaluate_actions(
TensorDict({
'features': batch_data['features'],
'times': batch_data['times'],
'cashes': batch_data['policy_trading_cashes'],
'positions': batch_data['policy_trading_positions'],
'rets': batch_data['policy_trading_rets'],
'actions': batch_data['training_actions']
}),
batch_data['training_actions']
)
# Compute policy loss
ratio = torch.exp(new_log_prob - batch_data['training_logprobs'])
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.config.clip_ratio, 1 + self.config.clip_ratio) * advantages
policy_loss = -torch.min(surr1, surr2).mean()
# Compute value loss
if self.config.clip_vloss:
value_pred_clipped = batch_data['training_values'] + torch.clamp(
new_value - batch_data['training_values'],
-self.config.clip_ratio,
self.config.clip_ratio
)
value_losses = (new_value - returns).pow(2)
value_losses_clipped = (value_pred_clipped - returns).pow(2)
value_loss = torch.max(value_losses, value_losses_clipped).mean()
else:
value_loss = (new_value - returns).pow(2).mean()
# Compute entropy loss
entropy_loss = entropy.mean()
# Total loss
total_loss = policy_loss + self.config.value_loss_coef * value_loss - self.config.entropy_coef * entropy_loss
# Update actor
self.policy_optimizer.zero_grad()
policy_loss.backward(retain_graph=True)
torch.nn.utils.clip_grad_norm_(self.agent.actor.parameters(), self.config.max_grad_norm)
self.policy_optimizer.step()
# Update critic
self.value_optimizer.zero_grad()
value_loss.backward()
torch.nn.utils.clip_grad_norm_(self.agent.critic.parameters(), self.config.max_grad_norm)
self.value_optimizer.step()
# Log metrics
info.update({
'policy_loss': policy_loss.item(),
'value_loss': value_loss.item(),
'entropy_loss': entropy_loss.item(),
'total_loss': total_loss.item()
})
def train(self):
"""Main training loop"""
start_time = time.time()
# Reset environments
state, info = self.train_environments.reset()
# Calculate number of updates
num_updates = self.config.total_steps // self.config.batch_size
warm_up_updates = self.config.warm_up_steps // self.config.batch_size
is_warmup = True
for update in range(1, num_updates + 1 + warm_up_updates):
if is_warmup and update > warm_up_updates:
is_warmup = False
# Learning rate annealing
if self.config.anneal_lr and not is_warmup:
frac = 1.0 - (update - 1.0 - warm_up_updates) / num_updates
self.policy_optimizer.param_groups[0]["lr"] = frac * self.config.policy_learning_rate
self.value_optimizer.param_groups[0]["lr"] = frac * self.config.value_learning_rate
# Collect experience
self.explore_environment(init_state=state, init_info=info, reset=False)
# Flatten storage
flat_storage = self.flatten_storage(self.storage)
b_inds = np.arange(self.config.batch_size)
# Update policy
info = self.update_policy(flat_storage, b_inds, info)
# Log progress
if update % self.config.check_steps == 0:
logger.info(f"Update {update}/{num_updates + warm_up_updates}")
logger.info(f"Policy Loss: {info.get('policy_loss', 0):.4f}")
logger.info(f"Value Loss: {info.get('value_loss', 0):.4f}")
logger.info(f"Entropy Loss: {info.get('entropy_loss', 0):.4f}")
# Save checkpoint
if update % self.config.save_steps == 0:
self.save_checkpoint(update)
def compute_advantages(self, batch_data):
"""Compute advantages using GAE"""
# This is a simplified version - actual implementation would use GAE
rewards = batch_data['training_rewards']
values = batch_data['training_values']
dones = batch_data['training_dones']
advantages = torch.zeros_like(rewards)
last_advantage = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
else:
next_value = values[t + 1]
delta = rewards[t] + self.config.gamma * next_value * (1 - dones[t]) - values[t]
advantages[t] = last_advantage = delta + self.config.gamma * self.config.gae_lambda * last_advantage * (1 - dones[t])
return advantages
def flatten_storage(self, storage):
"""Flatten storage for batch processing"""
# Implementation would flatten the storage structure
return storage
def save_checkpoint(self, update):
"""Save model checkpoint"""
checkpoint = {
'update': update,
'actor_state_dict': self.agent.actor.state_dict(),
'critic_state_dict': self.agent.critic.state_dict(),
'policy_optimizer_state_dict': self.policy_optimizer.state_dict(),
'value_optimizer_state_dict': self.value_optimizer.state_dict(),
}
torch.save(checkpoint, f"checkpoint_{update}.pth")
Step 8: Transformer Decoder - Reconstruction
FinWorld's TransformerDecoder is used for reconstruction tasks. Here's the actual implementation:
import torch
import torch.nn as nn
from finworld.registry import DECODER
from finworld.registry import EMBED
from finworld.models.decoder.base import Decoder
from finworld.models.modules import TransformerRopeBlock as Block
@DECODER.register_module(force=True)
class TransformerDecoder(Decoder):
def __init__(self,
*args,
input_dim: int = 128,
latent_dim: int = 128,
output_dim: int = 5,
depth: int = 2,
num_heads: int = 4,
mlp_ratio: float = 4.0,
norm_layer=nn.LayerNorm,
cls_embed: bool = True,
no_qkv_bias: bool = False,
trunc_init: bool = False,
if_mask: bool = False,
if_remove_cls_embed=True,
**kwargs
):
super(TransformerDecoder, self).__init__()
self.cls_embed = cls_embed
self.input_dim = input_dim
self.latent_dim = latent_dim
self.output_dim = output_dim if output_dim is not None else input_dim
self.depth = depth
self.num_heads = num_heads
self.mlp_ratio = mlp_ratio
self.norm_layer = norm_layer
self.trunc_init = trunc_init
self.if_mask = if_mask
self.if_remove_cls_embed = if_remove_cls_embed
# Linear projection to latent dimension
self.to_latent = nn.Linear(input_dim, latent_dim)
# CLS token for classification
if self.cls_embed:
self.cls_token = nn.Parameter(torch.zeros(1, 1, latent_dim))
# Mask token for masked reconstruction
self.mask_token = nn.Parameter(torch.zeros(1, 1, latent_dim))
# Transformer blocks
self.blocks = nn.ModuleList([
Block(latent_dim,
num_heads,
mlp_ratio,
qkv_bias=not no_qkv_bias,
norm_layer=norm_layer)
for i in range(depth)
])
# Final normalization and projection
self.norm = norm_layer(latent_dim)
self.proj = nn.Linear(latent_dim, self.output_dim)
self.initialize_weights()
def forward(self,
sample: torch.FloatTensor,
ids_restore: torch.LongTensor = None) -> torch.FloatTensor:
"""Forward pass through the decoder"""
# Project to latent dimension
sample = self.to_latent(sample) # (N, M, D)
N, M, D = sample.shape
# Handle masking if needed
if self.if_mask and ids_restore is not None:
L = ids_restore.shape[-1]
num_mask = L - M
assert num_mask >= 0, "Mask token count cannot be negative"
# Add mask tokens
mask_tokens = self.mask_token.repeat(N, num_mask, 1) # (N, L-M, D)
sample_ = torch.cat([sample, mask_tokens], dim=1) # (N, L, D)
# Restore original order
sample = torch.gather(
sample_,
dim=1,
index=ids_restore.unsqueeze(-1).repeat(1, 1, D) # (N, L, D)
)
else:
L = sample.shape[1]
sample = sample.view(N, L, D)
# Add CLS token if needed
if self.cls_embed:
cls_token = self.cls_token.expand(sample.shape[0], -1, -1)
sample = torch.cat((cls_token, sample), dim=1)
# Apply Transformer blocks
for blk in self.blocks:
sample = blk(sample)
sample = self.norm(sample)
# Remove CLS token if needed
if self.cls_embed and self.if_remove_cls_embed:
sample = sample[:, 1:, :]
# Final projection
sample = self.proj(sample)
return sample
Step 9: Training Loop Implementation
Here's how the training loop works in FinWorld:
def train_ppo_trading():
# Initialize components
dataset = SingleAssetDataset(...)
agent = PPO(...)
trainer = PPOTradingTrainer(...)
# Training loop
for update in range(num_updates):
# 1. Collect experience
trainer.explore_environment()
# 2. Flatten collected data
flat_storage = trainer.flatten_storage(trainer.storage)
# 3. Update policy for multiple epochs
for epoch in range(update_epochs):
# Sample batch
b_inds = np.arange(batch_size)
# Get batch data
batch_data = {key: flat_storage[key][b_inds] for key in flat_storage.keys()}
# Update actor and critic
trainer.update_policy(flat_storage, b_inds, info)
# 4. Log metrics and save checkpoints
if update % log_freq == 0:
trainer.log_metrics(update)
if update % save_freq == 0:
trainer.save_checkpoint(update)
Step 10: Environment Interaction
Understanding how the agent interacts with the trading environment:
def trading_episode(agent, env):
# Reset environment
state, info = env.reset()
done = False
total_reward = 0
while not done:
# Get action from agent
action, log_prob, value = agent.get_action_and_value(state)
# Execute action in environment
next_state, reward, done, truncated, info = env.step(action)
# Store experience
experience = {
'state': state,
'action': action,
'reward': reward,
'value': value,
'log_prob': log_prob,
'done': done
}
# Update state
state = next_state
total_reward += reward
return total_reward, experience
Step 11: Complete Training Script
Here's a complete example of training a PPO trading agent:
import torch
import numpy as np
from finworld.data.single_asset_dataset import SingleAssetDataset
from finworld.environment.environment_general_trading import EnvironmentGeneralTrading
from finworld.agent.rl.ppo import PPO
from finworld.trainer.rl.ppo_trading_trainer import PPOTradingTrainer
from finworld.registry import DATASET, ENVIRONMENT, AGENT, TRAINER
def main():
# 1. Load dataset
dataset = SingleAssetDataset(
symbol="AAPL",
data_path="workdir/AAPL_fmp_price_1day_processed",
history_timestamps=64,
future_timestamps=32,
start_timestamp="2015-01-01",
end_timestamp="2024-12-31",
level="1day"
)
# 2. Create environment
env = EnvironmentGeneralTrading(
dataset=dataset,
initial_amount=100000,
transaction_cost_pct=0.001,
history_timestamps=64,
step_timestamps=1,
future_timestamps=32
)
# 3. Create agent
agent = PPO(
task_type="trading",
embed_config={
"type": "TradingPatchEmbed",
"dense_input_dim": 5,
"sparse_input_dim": 4,
"latent_dim": 64,
"output_dim": 64,
"start_timestamp": "2015-01-01",
"end_timestamp": "2024-12-31",
"patch_timestamps": 4,
"history_timestamps": 64,
"level": "1day",
"if_use_sparse": True,
"if_use_trajectory": True,
"dropout": 0.1
},
encoder_config={
"type": "TransformerEncoder",
"input_dim": 64,
"latent_dim": 64,
"output_dim": 64,
"depth": 2,
"num_heads": 4,
},
action_dim=3,
actor_output_dim=3,
critic_output_dim=1
)
# 4. Create trainer
trainer = PPOTradingTrainer(
config=config,
dataset=dataset,
agent=agent,
metrics=metrics,
device=torch.device("cuda"),
dtype=torch.float32
)
# 5. Start training
trainer.train()
if __name__ == "__main__":
main()
Step 12: Running the Training
Execute the training using FinWorld's built-in scripts:
# Single GPU training
python scripts/rl_trading/train.py --config configs/rl_trading/ppo/AAPL_ppo_trading.py
# Multi-GPU training
CUDA_VISIBLE_DEVICES=0,1 python scripts/rl_trading/train.py --config configs/rl_trading/ppo/AAPL_ppo_trading.py
# Using shell script
bash examples/ppo_trading.sh
💡 Pro Tips
- Data Preparation: Ensure your data is properly processed and normalized
- Environment Design: Tune reward functions to match your trading objectives
- Model Architecture: Experiment with different embedding and encoder configurations
- Training Parameters: Adjust learning rates, batch sizes, and update epochs
- Monitoring: Use TensorBoard to track training progress and metrics
- Evaluation: Test your trained agent on out-of-sample data
Next Steps
Now that you've completed the basic tutorials, you can explore more advanced features:
- Advanced Models: Try transformer-based models for time series forecasting
- Reinforcement Learning: Implement RL agents for trading and portfolio management
- LLM Integration: Use large language models for financial reasoning
- Multi-Asset Strategies: Develop portfolio-level trading strategies
- Real-time Trading: Connect to live data feeds and execute trades
Check out our Examples section for more complex use cases and the API Reference for detailed documentation.