Tutorials

Step-by-step guides to get you started with FinWorld

Tutorial 1: Downloading Financial Data

Learn how to download financial data using FinWorld's downloader system. This tutorial covers the complete workflow from configuration to execution.

Step 1: Define Downloader and Processor

FinWorld provides several built-in downloaders for different data sources:

  • FMPDownloader - Financial Modeling Prep API for US market data
  • AlpacaDownloader - Alpaca API for real-time US market data
  • AkShareDownloader - AkShare for Chinese market data
  • TuShareDownloader - TuShare for Chinese financial data

Step 2: Create Configuration File

Create a Python configuration file (e.g., my_download_config.py):

# Basic configuration for downloading HS300 data
workdir = "workdir"
assets_name = "hs300"
source = "fmp"  # or "alpaca", "akshare", "tushare"
data_type = "price"
level = "1day"  # or "1min" for minute-level data
tag = f"{assets_name}_{source}_{data_type}_{level}"
exp_path = f"{workdir}/{tag}"
log_path = "finworld.log"

# Downloader configuration
downloader = dict(
    type="PriceDownloader",
    source=source,
    assets_path=f"configs/_asset_list_/{assets_name}.json",
    start_date="2020-01-01",
    end_date="2024-12-31",
    level=level,
    format="%Y-%m-%d",
    max_concurrent=50,
)

Step 3: Use Built-in Download Script

FinWorld provides a built-in download script at scripts/download/download.py:

import argparse
import os
import sys
from pathlib import Path
from mmengine import DictAction
import asyncio

root = str(Path(__file__).resolve().parents[2])
sys.path.append(root)

from finworld.log import logger
from finworld.config import config
from finworld.registry import DOWNLOADER

def parse_args():
    parser = argparse.ArgumentParser(description='main')
    parser.add_argument("--config", default=os.path.join(root, "configs", "download", "full_symbol_info.py"), help="config file path")
    parser.add_argument('--cfg-options', nargs='+', action=DictAction, help='override some settings in the used config')
    args = parser.parse_args()
    return args

async def main():
    # Parse command line arguments
    args = parse_args()
    
    # Initialize the configuration
    config.init_config(args.config, args)
    
    # Initialize the logger
    logger.init_logger(config=config)
    logger.info(f"| Logger initialized at: {config.log_path}")
    logger.info(f"| Config:\n{config.pretty_text}")
    
    downloader = DOWNLOADER.build(config.downloader)
    
    try:
        await downloader.run()
    except KeyboardInterrupt:
        sys.exit()

if __name__ == '__main__':
    asyncio.run(main())

Step 4: Run Download Script

Execute the download using the built-in script:

# Download HS300 data
python scripts/download/download.py --config configs/download/hs300/hs300_fmp_price_1day.py

# Download SP500 data
python scripts/download/download.py --config configs/download/sp500/sp500_fmp_price_1day.py

# Download DJ30 data
python scripts/download/download.py --config configs/download/dj30/dj30_fmp_price_1day.py

💡 Pro Tips

  • Use max_concurrent to control download speed and avoid rate limits
  • Check configs/_asset_list_/ for predefined asset collections
  • For large datasets, consider downloading in chunks by date ranges
  • Use the provided shell scripts in examples/download.sh for batch downloads

Tutorial 2: Processing Financial Data

Learn how to process downloaded financial data using FinWorld's processor system. This tutorial shows how to clean, transform, and prepare data for machine learning.

Step 1: Understand Data Processing

FinWorld's processor system handles:

  • Data Cleaning: Remove missing values, handle outliers
  • Feature Engineering: Calculate technical indicators, returns, volatility
  • Data Transformation: Normalization, standardization, scaling
  • Data Validation: Check data quality and consistency

Step 2: Create Processing Configuration

Create a processing configuration file (e.g., my_process_config.py):

# Processing configuration for HS300 data
workdir = "workdir"
assets_name = "hs300"
source = "fmp"
data_type = "price"
level = "1day"
tag = f"{assets_name}_{source}_{data_type}_{level}"
exp_path = f"{workdir}/{tag}"
log_path = "finworld.log"

# Processor configuration
processor = dict(
    type="PriceProcessor",
    source=source,
    data_path=exp_path,
    output_path=f"{exp_path}_processed",
    features=["open", "high", "low", "close", "volume"],
    technical_indicators=True,
    normalize=True,
    fill_missing=True,
)

Step 3: Use Built-in Processing Script

FinWorld provides a built-in processing script at scripts/process/process.py:

import argparse
import os
import sys
from pathlib import Path
from mmengine import DictAction
import asyncio

root = str(Path(__file__).resolve().parents[2])
sys.path.append(root)

from finworld.log import logger
from finworld.config import config
from finworld.registry import PROCESSOR

def parse_args():
    parser = argparse.ArgumentParser(description='main')
    parser.add_argument("--config", default=os.path.join(root, "configs", "process", "dj30.py"), help="config file path")
    parser.add_argument('--cfg-options', nargs='+', action=DictAction, help='override some settings in the used config')
    args = parser.parse_args()
    return args

async def main():
    # Parse command line arguments
    args = parse_args()
    
    # Initialize the configuration
    config.init_config(args.config, args)
    
    # Initialize the logger
    logger.init_logger(config=config)
    logger.info(f"| Logger initialized at: {config.log_path}")
    logger.info(f"| Config:\n{config.pretty_text}")
    
    processor = PROCESSOR.build(config.processor)
    
    try:
        await processor.run()
    except KeyboardInterrupt:
        sys.exit()

if __name__ == '__main__':
    asyncio.run(main())

Step 4: Run Processing Script

Execute the processing using the built-in script:

# Process HS300 data
python scripts/process/process.py --config configs/process/hs300.py

# Process SP500 data
python scripts/process/process.py --config configs/process/sp500.py

# Process DJ30 data
python scripts/process/process.py --config configs/process/dj30.py

💡 Pro Tips

  • Use the provided shell scripts in examples/process.sh for batch processing
  • Check processed data quality before training models
  • Adjust technical indicators based on your trading strategy
  • Consider data leakage when engineering features

Tutorial 3: RL Trading with PPO

Learn how to implement reinforcement learning trading using PPO algorithm. This tutorial covers the complete workflow from understanding FinWorld's core modules to training.

Step 1: Understanding FinWorld's RL Architecture

FinWorld's RL trading system consists of several key components:

  • Dataset: Loads and processes financial data
  • Environment: Simulates trading environment with rewards
  • Actor-Critic: Neural networks for policy and value estimation
  • PPO Agent: Combines actor and critic for RL training
  • Trainer: Manages the training loop and policy updates

Step 1.5: FinWorld's Registry System

FinWorld uses a registry system to manage all components. Here's how it works:

# finworld/registry.py
from mmengine.registry import Registry

# Define registries for different component types
DATASET = Registry('data', locations=['finworld.data'])
ENVIRONMENT = Registry('environment', locations=['finworld.environment'])
EMBED = Registry('embed', locations=['finworld.models'])
ENCODER = Registry('encoder', locations=['finworld.models'])
DECODER = Registry('decoder', locations=['finworld.models'])
AGENT = Registry('agent', locations=['finworld.agent'])
TRAINER = Registry('trainer', locations=['finworld.trainer'])
TASK = Registry('task', locations=['finworld.task'])

# Components are registered using decorators
@AGENT.register_module(force=True)
class PPO(nn.Module):
    # Implementation here
    pass

@TRAINER.register_module(force=True)
class PPOTradingTrainer():
    # Implementation here
    pass

# Components are built using the registry
agent = AGENT.build(agent_config)
trainer = TRAINER.build(trainer_config)

Step 2: Dataset - Loading Financial Data

FinWorld's SingleAssetDataset handles financial data loading and preprocessing:

from finworld.data.single_asset_dataset import SingleAssetDataset
from finworld.registry import DATASET

# Create dataset instance
dataset = SingleAssetDataset(
    symbol="AAPL",
    data_path="workdir/AAPL_fmp_price_1day_processed",
    history_timestamps=64,
    future_timestamps=32,
    start_timestamp="2015-01-01",
    end_timestamp="2024-12-31",
    level="1day",
    if_norm=True,
    if_use_temporal=True,
    if_use_future=True
)

# Key methods:
# - __getitem__(idx): Returns data for a specific time window
# - _load_asset_info(): Loads asset metadata
# - _init_asset_data(): Processes and normalizes data
# - _cal_time(): Calculates temporal features

Step 3: Environment - Trading Simulation

FinWorld's EnvironmentGeneralTrading simulates the trading environment:

from finworld.environment.environment_general_trading import EnvironmentGeneralTrading
from finworld.registry import ENVIRONMENT

# Create trading environment
env = EnvironmentGeneralTrading(
    dataset=dataset,
    initial_amount=100000,  # Starting capital
    transaction_cost_pct=0.001,  # 0.1% transaction cost
    history_timestamps=64,
    step_timestamps=1,
    future_timestamps=32,
    start_timestamp="2015-01-01",
    end_timestamp="2024-12-31",
    gamma=0.99  # Discount factor
)

# Key methods:
# - reset(): Initialize environment state
# - step(action): Execute action and return next state, reward, done, info
# - _init_features(): Load and process market data
# - _cal_reward(): Calculate trading rewards

Step 4: Actor Network - Policy Learning

FinWorld's Actor network learns the trading policy. Here's the actual implementation:

import torch
import torch.nn as nn
from tensordict import TensorDict
from diffusers.utils.accelerate_utils import apply_forward_hook
from einops import rearrange

from finworld.registry import EMBED, ENCODER
from finworld.models.base import Model
from finworld.models.modules.attention_pool import AttentionPool1D
from finworld.models.modules.mean_pool import MeanPool1D
from finworld.task import TaskType
from finworld.models.embed.position import SinCosPosition1DEmbed, SinCosPosition2DEmbed

class Actor(Model):
    def __init__(self,
                 task_type: str,
                 embed_config: dict,
                 encoder_config: dict,
                 action_dim: int,
                 output_dim: int,
                 pool_type: str = 'avg', # 'avg' or 'attn'
                 **kwargs):
        
        super(Actor, self).__init__()
        self.task_type = TaskType.from_string(task_type)
        self.action_dim = action_dim
        self.output_dim = output_dim

        # Build embedding layer using registry
        self.embed = EMBED.build(embed_config)

        # Position embedding based on task type
        if self.task_type == TaskType.TRADING:
            self.pos_embed = SinCosPosition1DEmbed(
                num_positions=embed_config["history_timestamps"] // embed_config["patch_timestamps"],
                embed_dim=embed_config["output_dim"],
                num_prefix=0
            )
        elif self.task_type == TaskType.PORTFOLIO:
            self.pos_embed = SinCosPosition2DEmbed(
                num_time=embed_config["history_timestamps"] // embed_config["patch_timestamps"],
                num_space= embed_config["num_assets"],
                embed_dim=embed_config["output_dim"],
                num_prefix=0
            )

        # Build encoder using registry
        self.encoder = ENCODER.build(encoder_config)

        # Pooling layer
        self.pool_type = pool_type
        if self.pool_type == 'avg':
            self.pool = MeanPool1D(dim=1, keepdim=False)
        elif self.pool_type == 'attn':
            self.pool = AttentionPool1D(
                in_features=encoder_config['output_dim'],
                out_features=output_dim,
                embed_dim=encoder_config['latent_dim'],
                num_heads=encoder_config['num_heads'],
            )
        
        # Final decoder layer
        self.decoder = nn.Linear(encoder_config['output_dim'], output_dim)
        
        self.initialize_weights()
        
    @apply_forward_hook
    def encode(self, x: TensorDict):
        """Encode input features through embedding and encoder"""
        x = self.embed(x)
        
        # Add positional embedding
        pos = self.pos_embed(x)
        x = x + pos

        # Process through encoder
        x, _, _ = self.encoder(x)
        return x
    
    @apply_forward_hook
    def decode(self, x: TensorDict):
        """Decode encoded features to action logits"""
        x = self.pool(x)
        x = self.decoder(x)
        return x
        
    def forward(self, x: TensorDict):
        """Complete forward pass"""
        batch_size = x.batch_size
        
        # Handle multi-environment case
        if len(batch_size) == 2:
            x = TensorDict(
                {
                    key: rearrange(value, 'b e ... -> (b e) ...') for key, value in x.items()
                },
                batch_size=batch_size[0] * batch_size[1]
            )
            
        x = self.encode(x)
        x = self.decode(x)
        
        # Reshape back for multi-environment case
        if len(batch_size) == 2:
            x = rearrange(x, '(b e) ... -> b e ...', b=batch_size[0])
            
        return x

Step 5: Critic Network - Value Estimation

FinWorld's Critic network estimates state values. Here's the actual implementation:

import torch
import torch.nn as nn
from tensordict import TensorDict
from diffusers.utils.accelerate_utils import apply_forward_hook
from einops import rearrange

from finworld.registry import EMBED, ENCODER
from finworld.models.base import Model
from finworld.models.modules.attention_pool import AttentionPool1D
from finworld.models.modules.mean_pool import MeanPool1D
from finworld.models.embed import SparseEmbed, DenseLinearEmbed
from finworld.task import TaskType
from finworld.models.embed.position import SinCosPosition2DEmbed, SinCosPosition1DEmbed

class Critic(Model):
    def __init__(self,
                 task_type: str,
                 embed_config: dict,
                 encoder_config: dict,
                 action_dim: int,
                 output_dim: int,
                 pool_type: str = 'avg',  # 'avg' or 'attn'
                 **kwargs):
        
        super(Critic, self).__init__()
        self.task_type = TaskType.from_string(task_type)

        # Build embedding layer using registry
        self.embed = EMBED.build(embed_config)

        # Position embedding based on task type
        if self.task_type == TaskType.TRADING:
            self.pos_embed = SinCosPosition1DEmbed(
                num_positions=embed_config["history_timestamps"] // embed_config["patch_timestamps"],
                embed_dim=embed_config["output_dim"],
                num_prefix=0
            )
        elif self.task_type == TaskType.PORTFOLIO:
            self.pos_embed = SinCosPosition2DEmbed(
                num_time=embed_config["history_timestamps"] // embed_config["patch_timestamps"],
                num_space= embed_config["num_assets"],
                embed_dim=embed_config["output_dim"],
            )

        # Build encoder using registry
        self.encoder = ENCODER.build(encoder_config)

        # Pooling layer
        if pool_type == 'avg':
            self.pool = MeanPool1D(dim=1, keepdim=False)
        elif pool_type == 'attn':
            self.pool = AttentionPool1D(
                in_features=encoder_config['output_dim'],
                out_features=encoder_config['output_dim'],
                embed_dim=encoder_config['latent_dim'],
                num_heads=encoder_config['num_heads'],
            )

        # Final decoder layer
        self.decoder = nn.Linear(encoder_config['output_dim'], output_dim)
        
        # Action encoder for action-value functions
        if self.task_type == TaskType.TRADING:
            self.action_encoder = SparseEmbed(
                num_embeddings=action_dim,
                output_dim=encoder_config['output_dim'],
            )
        elif self.task_type == TaskType.PORTFOLIO:
            self.action_encoder = DenseLinearEmbed(
                input_dim=action_dim,
                output_dim=encoder_config['output_dim'],
            )
            
        # Attention fusion for action-value
        self.attn_fusion = nn.MultiheadAttention(
                embed_dim=encoder_config['output_dim'],
                num_heads=encoder_config['num_heads'],
                batch_first=True,
            )
        
        self.initialize_weights()
        
    @apply_forward_hook
    def encode(self, x: TensorDict):
        """Encode input features through embedding and encoder"""
        x = self.embed(x)

        # Add positional embedding
        pos = self.pos_embed(x)
        x = x + pos

        # Process through encoder
        x, _, _ = self.encoder(x)
        return x
    
    @apply_forward_hook
    def decode(self, x: torch.Tensor, a: torch.Tensor = None):
        """Decode encoded features to value estimate"""
        x = self.pool(x)

        # Optionally fuse with action information
        if a is not None:
            a = self.action_encoder(a)
            x = self.attn_fusion(x, a, a)[0]

        x = self.decoder(x)
        x = x.squeeze(-1)
        return x
        
    def forward(self, x: TensorDict, a: torch.Tensor = None):
        """Complete forward pass"""
        batch_size = x.batch_size
        
        # Handle multi-environment case
        if len(batch_size) == 2:
            x = TensorDict(
                {
                    key: rearrange(value, 'b e ... -> (b e) ...') for key, value in x.items()
                },
                batch_size=batch_size[0] * batch_size[1]
            )
            
            if a is not None:
                a = rearrange(a, 'b e ... -> (b e) ...')
            
        x = self.encode(x)
        x = self.decode(x, a)
        
        # Reshape back for multi-environment case
        if len(batch_size) == 2:
            x = rearrange(x, '(b e) ... -> b e ...', b=batch_size[0])
            
        return x

Step 6: PPO Agent - Combining Actor and Critic

FinWorld's PPO agent combines actor and critic for RL training. Here's the actual implementation:

import torch
import torch.nn as nn
from functools import partial
from einops import rearrange
from torch.distributions.categorical import Categorical
from typing import Tuple
from tensordict import TensorDict
from torch import Tensor
import torch.nn.functional as F
from torch.distributions import Dirichlet

from finworld.registry import AGENT
from finworld.models.rl.actor import Actor
from finworld.models.rl.critic import Critic
from finworld.task import TaskType

@AGENT.register_module(force=True)
class PPO(nn.Module):
    def __init__(self,
                 *args,
                 task_type: str,
                 embed_config: dict,
                 encoder_config: dict,
                 action_dim: int,
                 actor_output_dim: int,
                 critic_output_dim: int,
                 **kwargs
                 ):

        super(PPO, self).__init__()
        
        self.task_type = TaskType.from_string(task_type)
        self.action_dim = action_dim
        self.actor_output_dim = actor_output_dim
        self.critic_output_dim = critic_output_dim

        # Create actor network
        self.actor = Actor(
            task_type=task_type,
            embed_config=embed_config,
            encoder_config=encoder_config,
            action_dim=action_dim,
            output_dim=actor_output_dim
        )
        
        # Create critic network
        self.critic = Critic(
            task_type=task_type,
            embed_config=embed_config,
            encoder_config=encoder_config,
            action_dim=action_dim,
            output_dim=critic_output_dim
        )

    def get_value(self, x: TensorDict, a: Tensor = None):
        """Get state value estimate"""
        return self.critic(x, a)

    def get_action(self, x: TensorDict):
        """Sample action from policy"""
        batch_size = x.batch_size
        
        # Get action logits from actor
        logits = self.actor(x)
        
        # Create categorical distribution
        dist = Categorical(logits=logits)
        
        # Sample action
        action = dist.sample()
        
        # Get log probability
        log_prob = dist.log_prob(action)
        
        return action, log_prob

    def get_action_and_value(self, x: TensorDict):
        """Get both action and value in one forward pass"""
        batch_size = x.batch_size
        
        # Get action logits from actor
        logits = self.actor(x)
        
        # Create categorical distribution
        dist = Categorical(logits=logits)
        
        # Sample action
        action = dist.sample()
        
        # Get log probability
        log_prob = dist.log_prob(action)
        
        # Get value estimate
        value = self.critic(x, action)
        
        return action, log_prob, value

    def evaluate_actions(self, x: TensorDict, action: Tensor):
        """Evaluate actions for policy updates"""
        batch_size = x.batch_size
        
        # Get action logits from actor
        logits = self.actor(x)
        
        # Create categorical distribution
        dist = Categorical(logits=logits)
        
        # Get log probability of given actions
        log_prob = dist.log_prob(action)
        
        # Get entropy
        entropy = dist.entropy()
        
        # Get value estimate
        value = self.critic(x, action)
        
        return log_prob, value, entropy

    def forward(self, *input, **kwargs):
        """Forward pass - not used in PPO"""
        pass

Step 7: PPO Trainer - Training Loop

FinWorld's PPOTradingTrainer manages the training process. Here's the actual implementation:

import torch
from torch import nn
import gym
import torch.optim as optim
import time
import numpy as np
import os
from copy import deepcopy
from tensordict import TensorDict
from einops import rearrange

from finworld.registry import TRAINER
from finworld.registry import ENVIRONMENT
from finworld.environment import make_env
from finworld.utils import build_storage, TradingRecords
from finworld.log import logger

@TRAINER.register_module(force=True)
class PPOTradingTrainer():
    def __init__(self,
                 *args,
                 config = None,
                 dataset = None,
                 agent = None,
                 metrics = None,
                 device = None,
                 dtype = None,
                 **kwargs):
        
        # Build training environments
        train_environment_config = deepcopy(config.train_environment)
        train_environment_config.update({"dataset": dataset})
        self.train_environment = ENVIRONMENT.build(train_environment_config)

        # Build validation environments
        valid_environment_config = deepcopy(config.valid_environment)
        valid_environment_config.update({"dataset": dataset})
        self.valid_environment = ENVIRONMENT.build(valid_environment_config)

        # Build test environments
        test_environment_config = deepcopy(config.test_environment)
        test_environment_config.update({"dataset": dataset})
        self.test_environment = ENVIRONMENT.build(test_environment_config)

        # Create vectorized environments
        env_name = self.train_environment.__class__.__name__
        self.train_environments = gym.vector.AsyncVectorEnv([
            make_env(env_name, env_params=dict(env=deepcopy(self.train_environment),
                                               transition_shape=config.transition_shape, 
                                               seed=config.seed + i)) 
            for i in range(config.num_envs)
        ])

        # Initialize agent and optimizers
        self.agent = agent
        self.policy_optimizer = optim.Adam(agent.actor.parameters(), lr=config.policy_learning_rate)
        self.value_optimizer = optim.Adam(agent.critic.parameters(), lr=config.value_learning_rate)
        
        # Training configuration
        self.config = config
        self.device = device
        self.dtype = dtype
        self.metrics = metrics
        
        # Storage for experience
        self.storage = build_storage(config.transition_shape, config.batch_size)

    def explore_environment(self, init_state, init_info, reset=False):
        """Collect experience from environment"""
        if reset:
            state, info = self.train_environments.reset()
        else:
            state, info = init_state, init_info
            
        # Collect experience for num_steps
        for step in range(self.config.num_steps):
            # Get action from agent
            with torch.no_grad():
                action, log_prob, value = self.agent.get_action_and_value(state)
            
            # Execute action in environment
            next_state, reward, done, truncated, info = self.train_environments.step(action)
            
            # Store experience
            self.storage.store({
                'features': state['features'],
                'times': state['times'],
                'policy_trading_cashes': state.get('cashes', torch.zeros_like(reward)),
                'policy_trading_positions': state.get('positions', torch.zeros_like(reward)),
                'policy_trading_actions': action,
                'policy_trading_rets': reward,
                'training_values': value,
                'training_dones': done,
                'training_actions': action,
                'training_logprobs': log_prob,
                'training_rewards': reward,
                'training_advantages': torch.zeros_like(reward),  # Will be computed later
                'training_returns': torch.zeros_like(reward)     # Will be computed later
            })
            
            state = next_state

    def update_policy(self, flat_storage, b_inds, info):
        """Update actor and critic networks using PPO"""
        # Get batch data
        batch_data = {key: flat_storage[key][b_inds] for key in flat_storage.keys()}
        
        # Compute advantages and returns
        advantages = self.compute_advantages(batch_data)
        returns = advantages + batch_data['training_values']
        
        # Normalize advantages
        if self.config.norm_adv:
            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        # Update for multiple epochs
        for epoch in range(self.config.update_epochs):
            # Get new action probabilities
            new_log_prob, new_value, entropy = self.agent.evaluate_actions(
                TensorDict({
                    'features': batch_data['features'],
                    'times': batch_data['times'],
                    'cashes': batch_data['policy_trading_cashes'],
                    'positions': batch_data['policy_trading_positions'],
                    'rets': batch_data['policy_trading_rets'],
                    'actions': batch_data['training_actions']
                }), 
                batch_data['training_actions']
            )
            
            # Compute policy loss
            ratio = torch.exp(new_log_prob - batch_data['training_logprobs'])
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.config.clip_ratio, 1 + self.config.clip_ratio) * advantages
            policy_loss = -torch.min(surr1, surr2).mean()
            
            # Compute value loss
            if self.config.clip_vloss:
                value_pred_clipped = batch_data['training_values'] + torch.clamp(
                    new_value - batch_data['training_values'], 
                    -self.config.clip_ratio, 
                    self.config.clip_ratio
                )
                value_losses = (new_value - returns).pow(2)
                value_losses_clipped = (value_pred_clipped - returns).pow(2)
                value_loss = torch.max(value_losses, value_losses_clipped).mean()
            else:
                value_loss = (new_value - returns).pow(2).mean()
            
            # Compute entropy loss
            entropy_loss = entropy.mean()
            
            # Total loss
            total_loss = policy_loss + self.config.value_loss_coef * value_loss - self.config.entropy_coef * entropy_loss
            
            # Update actor
            self.policy_optimizer.zero_grad()
            policy_loss.backward(retain_graph=True)
            torch.nn.utils.clip_grad_norm_(self.agent.actor.parameters(), self.config.max_grad_norm)
            self.policy_optimizer.step()
            
            # Update critic
            self.value_optimizer.zero_grad()
            value_loss.backward()
            torch.nn.utils.clip_grad_norm_(self.agent.critic.parameters(), self.config.max_grad_norm)
            self.value_optimizer.step()
            
            # Log metrics
            info.update({
                'policy_loss': policy_loss.item(),
                'value_loss': value_loss.item(),
                'entropy_loss': entropy_loss.item(),
                'total_loss': total_loss.item()
            })

    def train(self):
        """Main training loop"""
        start_time = time.time()
        
        # Reset environments
        state, info = self.train_environments.reset()
        
        # Calculate number of updates
        num_updates = self.config.total_steps // self.config.batch_size
        warm_up_updates = self.config.warm_up_steps // self.config.batch_size
        
        is_warmup = True
        for update in range(1, num_updates + 1 + warm_up_updates):
            if is_warmup and update > warm_up_updates:
                is_warmup = False
            
            # Learning rate annealing
            if self.config.anneal_lr and not is_warmup:
                frac = 1.0 - (update - 1.0 - warm_up_updates) / num_updates
                self.policy_optimizer.param_groups[0]["lr"] = frac * self.config.policy_learning_rate
                self.value_optimizer.param_groups[0]["lr"] = frac * self.config.value_learning_rate
            
            # Collect experience
            self.explore_environment(init_state=state, init_info=info, reset=False)
            
            # Flatten storage
            flat_storage = self.flatten_storage(self.storage)
            b_inds = np.arange(self.config.batch_size)
            
            # Update policy
            info = self.update_policy(flat_storage, b_inds, info)
            
            # Log progress
            if update % self.config.check_steps == 0:
                logger.info(f"Update {update}/{num_updates + warm_up_updates}")
                logger.info(f"Policy Loss: {info.get('policy_loss', 0):.4f}")
                logger.info(f"Value Loss: {info.get('value_loss', 0):.4f}")
                logger.info(f"Entropy Loss: {info.get('entropy_loss', 0):.4f}")
            
            # Save checkpoint
            if update % self.config.save_steps == 0:
                self.save_checkpoint(update)

    def compute_advantages(self, batch_data):
        """Compute advantages using GAE"""
        # This is a simplified version - actual implementation would use GAE
        rewards = batch_data['training_rewards']
        values = batch_data['training_values']
        dones = batch_data['training_dones']
        
        advantages = torch.zeros_like(rewards)
        last_advantage = 0
        
        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[t + 1]
            
            delta = rewards[t] + self.config.gamma * next_value * (1 - dones[t]) - values[t]
            advantages[t] = last_advantage = delta + self.config.gamma * self.config.gae_lambda * last_advantage * (1 - dones[t])
        
        return advantages

    def flatten_storage(self, storage):
        """Flatten storage for batch processing"""
        # Implementation would flatten the storage structure
        return storage

    def save_checkpoint(self, update):
        """Save model checkpoint"""
        checkpoint = {
            'update': update,
            'actor_state_dict': self.agent.actor.state_dict(),
            'critic_state_dict': self.agent.critic.state_dict(),
            'policy_optimizer_state_dict': self.policy_optimizer.state_dict(),
            'value_optimizer_state_dict': self.value_optimizer.state_dict(),
        }
        torch.save(checkpoint, f"checkpoint_{update}.pth")

Step 8: Transformer Decoder - Reconstruction

FinWorld's TransformerDecoder is used for reconstruction tasks. Here's the actual implementation:

import torch
import torch.nn as nn

from finworld.registry import DECODER
from finworld.registry import EMBED
from finworld.models.decoder.base import Decoder
from finworld.models.modules import TransformerRopeBlock as Block

@DECODER.register_module(force=True)
class TransformerDecoder(Decoder):
    def __init__(self,
                 *args,
                 input_dim: int = 128,
                 latent_dim: int = 128,
                 output_dim: int = 5,
                 depth: int = 2,
                 num_heads: int = 4,
                 mlp_ratio: float = 4.0,
                 norm_layer=nn.LayerNorm,
                 cls_embed: bool = True,
                 no_qkv_bias: bool = False,
                 trunc_init: bool = False,
                 if_mask: bool = False,
                 if_remove_cls_embed=True,
                 **kwargs
                 ):
        super(TransformerDecoder, self).__init__()

        self.cls_embed = cls_embed
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.output_dim = output_dim if output_dim is not None else input_dim
        self.depth = depth
        self.num_heads = num_heads
        self.mlp_ratio = mlp_ratio
        self.norm_layer = norm_layer
        self.trunc_init = trunc_init
        self.if_mask = if_mask
        self.if_remove_cls_embed = if_remove_cls_embed

        # Linear projection to latent dimension
        self.to_latent = nn.Linear(input_dim, latent_dim)

        # CLS token for classification
        if self.cls_embed:
            self.cls_token = nn.Parameter(torch.zeros(1, 1, latent_dim))

        # Mask token for masked reconstruction
        self.mask_token = nn.Parameter(torch.zeros(1, 1, latent_dim))

        # Transformer blocks
        self.blocks = nn.ModuleList([
            Block(latent_dim,
                  num_heads,
                  mlp_ratio,
                  qkv_bias=not no_qkv_bias,
                  norm_layer=norm_layer)
            for i in range(depth)
        ])

        # Final normalization and projection
        self.norm = norm_layer(latent_dim)
        self.proj = nn.Linear(latent_dim, self.output_dim)

        self.initialize_weights()

    def forward(self,
                sample: torch.FloatTensor,
                ids_restore: torch.LongTensor = None) -> torch.FloatTensor:
        """Forward pass through the decoder"""
        
        # Project to latent dimension
        sample = self.to_latent(sample)  # (N, M, D)
        N, M, D = sample.shape

        # Handle masking if needed
        if self.if_mask and ids_restore is not None:
            L = ids_restore.shape[-1]
            num_mask = L - M
            assert num_mask >= 0, "Mask token count cannot be negative"

            # Add mask tokens
            mask_tokens = self.mask_token.repeat(N, num_mask, 1)  # (N, L-M, D)
            sample_ = torch.cat([sample, mask_tokens], dim=1)  # (N, L, D)

            # Restore original order
            sample = torch.gather(
                sample_,
                dim=1,
                index=ids_restore.unsqueeze(-1).repeat(1, 1, D)  # (N, L, D)
            )
        else:
            L = sample.shape[1]
            sample = sample.view(N, L, D)

        # Add CLS token if needed
        if self.cls_embed:
            cls_token = self.cls_token.expand(sample.shape[0], -1, -1)
            sample = torch.cat((cls_token, sample), dim=1)

        # Apply Transformer blocks
        for blk in self.blocks:
            sample = blk(sample)
        sample = self.norm(sample)

        # Remove CLS token if needed
        if self.cls_embed and self.if_remove_cls_embed:
            sample = sample[:, 1:, :]

        # Final projection
        sample = self.proj(sample)
        return sample

Step 9: Training Loop Implementation

Here's how the training loop works in FinWorld:

def train_ppo_trading():
    # Initialize components
    dataset = SingleAssetDataset(...)
    agent = PPO(...)
    trainer = PPOTradingTrainer(...)
    
    # Training loop
    for update in range(num_updates):
        # 1. Collect experience
        trainer.explore_environment()
        
        # 2. Flatten collected data
        flat_storage = trainer.flatten_storage(trainer.storage)
        
        # 3. Update policy for multiple epochs
        for epoch in range(update_epochs):
            # Sample batch
            b_inds = np.arange(batch_size)
            
            # Get batch data
            batch_data = {key: flat_storage[key][b_inds] for key in flat_storage.keys()}
            
            # Update actor and critic
            trainer.update_policy(flat_storage, b_inds, info)
            
        # 4. Log metrics and save checkpoints
        if update % log_freq == 0:
            trainer.log_metrics(update)
            
        if update % save_freq == 0:
            trainer.save_checkpoint(update)

Step 10: Environment Interaction

Understanding how the agent interacts with the trading environment:

def trading_episode(agent, env):
    # Reset environment
    state, info = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        # Get action from agent
        action, log_prob, value = agent.get_action_and_value(state)
        
        # Execute action in environment
        next_state, reward, done, truncated, info = env.step(action)
        
        # Store experience
        experience = {
            'state': state,
            'action': action,
            'reward': reward,
            'value': value,
            'log_prob': log_prob,
            'done': done
        }
        
        # Update state
        state = next_state
        total_reward += reward
        
    return total_reward, experience

Step 11: Complete Training Script

Here's a complete example of training a PPO trading agent:

import torch
import numpy as np
from finworld.data.single_asset_dataset import SingleAssetDataset
from finworld.environment.environment_general_trading import EnvironmentGeneralTrading
from finworld.agent.rl.ppo import PPO
from finworld.trainer.rl.ppo_trading_trainer import PPOTradingTrainer
from finworld.registry import DATASET, ENVIRONMENT, AGENT, TRAINER

def main():
    # 1. Load dataset
    dataset = SingleAssetDataset(
        symbol="AAPL",
        data_path="workdir/AAPL_fmp_price_1day_processed",
        history_timestamps=64,
        future_timestamps=32,
        start_timestamp="2015-01-01",
        end_timestamp="2024-12-31",
        level="1day"
    )
    
    # 2. Create environment
    env = EnvironmentGeneralTrading(
        dataset=dataset,
        initial_amount=100000,
        transaction_cost_pct=0.001,
        history_timestamps=64,
        step_timestamps=1,
        future_timestamps=32
    )
    
    # 3. Create agent
    agent = PPO(
        task_type="trading",
        embed_config={
            "type": "TradingPatchEmbed",
            "dense_input_dim": 5,
            "sparse_input_dim": 4,
            "latent_dim": 64,
            "output_dim": 64,
            "start_timestamp": "2015-01-01",
            "end_timestamp": "2024-12-31",
            "patch_timestamps": 4,
            "history_timestamps": 64,
            "level": "1day",
            "if_use_sparse": True,
            "if_use_trajectory": True,
            "dropout": 0.1
        },
        encoder_config={
            "type": "TransformerEncoder",
            "input_dim": 64,
            "latent_dim": 64,
            "output_dim": 64,
            "depth": 2,
            "num_heads": 4,
        },
        action_dim=3,
        actor_output_dim=3,
        critic_output_dim=1
    )
    
    # 4. Create trainer
    trainer = PPOTradingTrainer(
        config=config,
        dataset=dataset,
        agent=agent,
        metrics=metrics,
        device=torch.device("cuda"),
        dtype=torch.float32
    )
    
    # 5. Start training
    trainer.train()

if __name__ == "__main__":
    main()

Step 12: Running the Training

Execute the training using FinWorld's built-in scripts:

# Single GPU training
python scripts/rl_trading/train.py --config configs/rl_trading/ppo/AAPL_ppo_trading.py

# Multi-GPU training
CUDA_VISIBLE_DEVICES=0,1 python scripts/rl_trading/train.py --config configs/rl_trading/ppo/AAPL_ppo_trading.py

# Using shell script
bash examples/ppo_trading.sh

💡 Pro Tips

  • Data Preparation: Ensure your data is properly processed and normalized
  • Environment Design: Tune reward functions to match your trading objectives
  • Model Architecture: Experiment with different embedding and encoder configurations
  • Training Parameters: Adjust learning rates, batch sizes, and update epochs
  • Monitoring: Use TensorBoard to track training progress and metrics
  • Evaluation: Test your trained agent on out-of-sample data

Next Steps

Now that you've completed the basic tutorials, you can explore more advanced features:

  • Advanced Models: Try transformer-based models for time series forecasting
  • Reinforcement Learning: Implement RL agents for trading and portfolio management
  • LLM Integration: Use large language models for financial reasoning
  • Multi-Asset Strategies: Develop portfolio-level trading strategies
  • Real-time Trading: Connect to live data feeds and execute trades

Check out our Examples section for more complex use cases and the API Reference for detailed documentation.