Freqtrade 策略开发指南
本文档详细介绍如何编写 Freqtrade 策略代码。
目录
策略核心思想
策略的核心思想是告诉程序在何时开多、开空以及何时平仓。举例来说,可以在上涨趋势中开多并尝试在局部高点平仓,或在下跌趋势中开空并在局部低点平仓——当然这是理想化的设想,因为我们无法准确判断真正的高点或低点。
基础方法
因此策略最基础的两个方法是:
- 入场信号:
populate_entry_trend - 出场信号:
populate_exit_trend
策略示例
以下是一个简单的 RSI 做空策略示例:
import numpy as np
import pandas as pd
from pandas import DataFrame
from freqtrade.strategy import IStrategy, IntParameter
import talib.abstract as ta
class SimpleRSI(IStrategy):
"""
简单的 RSI 做空策略:
- RSI > 70 时做空
- RSI < 50 时平仓
"""
INTERFACE_VERSION = 3
# 修改时间周期为 3 分钟
timeframe = "3m"
# 启用做空功能
can_short: bool = True
# 简化 ROI 设置
minimal_roi = {
"0": 0.10 # 10% 收益就退出
}
# 止损设置
stoploss = -0.05 # 5% 止损
# 关闭追踪止损
trailing_stop = False
process_only_new_candles = True
# 关闭做多的退出信号,因为我们只做空
use_exit_signal = True
exit_profit_only = False
ignore_roi_if_entry_signal = False
startup_candle_count: int = 30
order_types = {
"entry": "limit",
"exit": "limit",
"stoploss": "market",
"stoploss_on_exchange": False
}
order_time_in_force = {
"entry": "GTC",
"exit": "GTC"
}
@property
def plot_config(self):
return {
"main_plot": {},
"subplots": {
"RSI": {
"rsi": {"color": "red"},
}
}
}
def informative_pairs(self):
return []
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""只计算 RSI 指标。"""
dataframe["rsi"] = ta.RSI(dataframe)
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""入场信号:做空当 RSI > 70。"""
dataframe["enter_long"] = 0
dataframe.loc[
(
(dataframe["rsi"] > 70) &
(dataframe["volume"] > 0)
),
"enter_short"
] = 1
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""退出信号:平空仓当 RSI < 50。"""
dataframe["exit_long"] = 0
dataframe.loc[
(
(dataframe["rsi"] < 50) &
(dataframe["volume"] > 0)
),
"exit_short"
] = 1
return dataframe核心函数详解
1. populate_indicators
这个方法用于填充技术指标:
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""只计算 RSI 指标。"""
dataframe["rsi"] = ta.RSI(dataframe)
return dataframe2. populate_entry_trend
这个方法定义入场信号:
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""入场信号:做空当 RSI > 70。"""
dataframe["enter_long"] = 0
dataframe.loc[
(
(dataframe["rsi"] > 70) &
(dataframe["volume"] > 0)
),
"enter_short"
] = 1
return dataframe3. populate_exit_trend
这个方法定义出场信号:
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""退出信号:平空仓当 RSI < 50。"""
dataframe["exit_long"] = 0
dataframe.loc[
(
(dataframe["rsi"] < 50) &
(dataframe["volume"] > 0)
),
"exit_short"
] = 1
return dataframe配置参数说明
做空配置
要启用做空功能,需要设置:
can_short: bool = True同时确保 config.json 中的交易模式为期货模式:
{
"trading_mode": "futures"
}注意:现货模式下无法做空。
策略逻辑说明
这个策略主要是用来进行做空,具体逻辑如下:
- 入场条件:3分钟K线RSI超过70时开空仓
- 出场条件:3分钟K线RSI低于50时平空仓
核心方法说明
重点是这三个方法:
populate_indicators- 用来填充指标populate_entry_trend- 用来定义入场信号populate_exit_trend- 用来定义出场信号
DataFrame 工作原理详解
关键理解:DataFrame 里数据的增加是 Freqtrade 策略的核心
Freqtrade 的工作原理
原始K线数据
↓
populate_indicators() ← 【关键】添加新列(RSI、MACD等)
↓
populate_entry_trend() ← 【关键】添加 enter_short/enter_long 列
↓
populate_exit_trend() ← 【关键】添加 exit_short/exit_long 列
↓
Freqtrade 引擎读取这些列 → 执行交易DataFrame 的演变过程
初始状态(原始K线数据)
timestamp open high low close volume
0 2025-11-06 100.5 101.0 100.2 100.8 1000
1 2025-11-06 100.8 101.5 100.5 101.2 1200
2 2025-11-06 101.2 102.0 101.0 101.8 1500经过 populate_indicators()
timestamp open high low close volume rsi
0 2025-11-06 100.5 101.0 100.2 100.8 1000 45.3
1 2025-11-06 100.8 101.5 100.5 101.2 1200 48.7
2 2025-11-06 101.2 102.0 101.0 101.8 1500 72.1 ← RSI > 70!经过 populate_entry_trend()
timestamp open high low close volume rsi enter_short
0 2025-11-06 100.5 101.0 100.2 100.8 1000 45.3 0
1 2025-11-06 100.8 101.5 100.5 101.2 1200 48.7 0
2 2025-11-06 101.2 102.0 101.0 101.8 1500 72.1 1 ← 做空信号!经过 populate_exit_trend()
timestamp open high low close volume rsi enter_short exit_short
0 2025-11-06 100.5 101.0 100.2 100.8 1000 45.3 0 1
1 2025-11-06 100.8 101.5 100.5 101.2 1200 48.7 0 1
2 2025-11-06 101.2 102.0 101.0 101.8 1500 72.1 1 0核心逻辑
# Freqtrade 就是这样读取你的信号的:
if dataframe.loc[current_row, "enter_short"] == 1:
# 执行做空
place_short_order()
if dataframe.loc[current_row, "exit_short"] == 1:
# 执行平仓
place_exit_order()就这么简单! Freqtrade 只是在读你添加到 DataFrame 里的数据。
如何扩展策略 - 添加更多列
想要更好的策略?就是添加更多的列!
例如,添加 MACD:
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 原来的 RSI
dataframe["rsi"] = ta.RSI(dataframe)
# 新增:MACD
macd = ta.MACD(dataframe)
dataframe["macd"] = macd['macd']
dataframe["macd_signal"] = macd['macdsignal']
# 新增:移动平均线
dataframe["sma_20"] = ta.SMA(dataframe, timeperiod=20)
dataframe["sma_50"] = ta.SMA(dataframe, timeperiod=50)
return dataframe现在你的 DataFrame 变成:
timestamp open close volume rsi macd macd_signal sma_20 sma_50
0 2025-11-06 100.5 100.8 1000 45.3 -0.5 -0.3 100.2 100.1
1 2025-11-06 100.8 101.2 1200 48.7 -0.4 -0.3 100.5 100.2
2 2025-11-06 101.2 101.8 1500 72.1 0.1 -0.2 100.8 100.3然后在 populate_entry_trend() 中使用这些新列:
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe.loc[
(
(dataframe["rsi"] > 70) &
(dataframe["macd"] > dataframe["macd_signal"]) & ← 使用新列!
(dataframe["close"] < dataframe["sma_20"]) & ← 使用新列!
(dataframe["volume"] > 0)
),
"enter_short"
] = 1
return dataframe完整的数据流
原始数据(OHLCV)
↓
populate_indicators()
├─ 添加 rsi 列
├─ 添加 macd 列
├─ 添加 sma_20 列
└─ 添加 sma_50 列
↓
populate_entry_trend()
├─ 读取 rsi、macd、sma_20、sma_50
└─ 添加 enter_short 列
↓
populate_exit_trend()
├─ 读取 rsi、macd 等
└─ 添加 exit_short 列
↓
【交易执行期间】
↓
adjust_trade_position() ← 【关键】动态调整仓位
├─ 读取当前盈亏 current_profit
├─ 读取当前价格 current_rate
├─ 读取技术指标状态
└─ 决定加仓/减仓/保持
↓
Freqtrade 引擎
├─ 读取 enter_short 列 → 执行做空
├─ 读取 exit_short 列 → 执行平仓
├─ 读取 adjust_trade_position 返回值 → 调整仓位
└─ 计算盈亏adjust_trade_position 在 DataFrame 流程中的位置
adjust_trade_position 与前面的方法不同,它不是在 DataFrame 中添加列,而是在交易执行期间被调用。
调用时机
# 交易流程中的调用点:
1. 入场信号触发 → 开仓
↓
2. 持仓期间 → 每个新的K线都会检查
↓
3. adjust_trade_position() ← 在这里被调用!
├─ 分析当前持仓状态
├─ 判断是否需要调整仓位
└─ 返回调整金额(正数加仓/负数减仓/None保持)
↓
4. 如果需要调整 → 执行加仓或减仓订单
↓
5. 继续持仓 → 等待下次检查
↓
6. 出场信号触发 → 平仓DataFrame 与 adjust_trade_position 的协作
虽然 adjust_trade_position 不直接修改 DataFrame,但它需要读取 DataFrame 中的数据来做决策:
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
**kwargs) -> Optional[float]:
# 1. 获取当前最新的 DataFrame 数据
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
# 2. 读取之前在 populate_indicators() 中添加的列
latest_candle = dataframe.iloc[-1]
current_rsi = latest_candle['rsi'] ← 来自 populate_indicators()
current_macd = latest_candle['macd'] ← 来自 populate_indicators()
current_sma = latest_candle['sma_20'] ← 来自 populate_indicators()
# 3. 基于这些数据做决策
if current_rsi < 30 and current_profit < -0.02:
return trade.stake_amount * 0.5 # 加仓
return None可视化的完整示例
假设有一个持仓的交易,整个过程如下:
初始状态(开仓后)
# Trade 对象状态
trade = {
'pair': 'BTC/USDT',
'stake_amount': 100.0,
'open_rate': 50000.0,
'current_profit': 0.0, # 刚开仓,盈亏为0
'orders': [entry_order]
}
# DataFrame 最新状态
latest_candle = {
'rsi': 45.2,
'macd': -0.5,
'sma_20': 49800.0,
'close': 50000.0
}第一次调整(价格下跌,RSI超卖)
# 市场变化后
trade['current_profit'] = -0.025 # 亏损2.5%
latest_candle['rsi'] = 28.5 # RSI超卖
latest_candle['close'] = 48750.0 # 价格下跌
# adjust_trade_position 被调用
def adjust_trade_position(...):
if current_rsi < 30 and current_profit < -0.02:
return 50.0 # 加仓50U
# 结果:创建加仓订单,更新 Trade 对象
trade['orders'].append(additional_order)
trade['stake_amount'] = 150.0 # 总仓位变为150U第二次调整(价格反弹,RSI正常)
# 市场继续变化
trade['current_profit'] = 0.015 # 盈利1.5%
latest_candle['rsi'] = 52.3 # RSI正常
latest_candle['close'] = 49500.0 # 价格反弹
# adjust_trade_position 再次被调用
def adjust_trade_position(...):
# RSI正常,盈利不多,不做调整
return None
# 结果:保持现状,等待下一次检查最终平仓
# 价格继续上涨,触发 exit_short 信号
latest_candle['rsi'] = 48.0 # RSI < 50,触发平仓信号
latest_candle['close'] = 51000.0 # 价格上涨
# exit_short = 1 → 执行平仓
# 所有仓位被关闭,交易结束关键理解点
| 概念 | 作用 | 与 DataFrame 的关系 |
|---|---|---|
| populate_indicators() | 添加技术指标列 | 提供决策所需的数据 |
| populate_entry_trend() | 添加入场信号列 | 决定何时开仓 |
| populate_exit_trend() | 添加出场信号列 | 决定何时平仓 |
| adjust_trade_position() | 动态仓位调整 | 读取指标数据,调整持仓大小 |
| DataFrame | 数据存储中心 | 存储所有技术指标和信号 |
| Trade 对象 | 交易状态跟踪 | 记录当前持仓信息和盈亏 |
总结: adjust_trade_position 是策略的"智能管家",它利用 DataFrame 中的技术指标数据,在持仓期间智能地调整仓位大小,实现更精细的风险管理和收益优化。
关键要点
| 概念 | 说明 |
|---|---|
| populate_indicators() | 添加技术指标列(RSI、MACD等) |
| populate_entry_trend() | 添加入场信号列(enter_short、enter_long) |
| populate_exit_trend() | 添加出场信号列(exit_short、exit_long) |
| DataFrame | 所有这些列的集合 |
| Freqtrade引擎 | 读取这些列,执行交易 |
总结: Freqtrade 的核心就是在 DataFrame 中添加各种指标列和信号列,然后 Freqtrade 读取这些列并执行交易。想要更好的策略?就是在 DataFrame 中添加更多有用的列!🚀
策略基础结构
- 基本模板
- 核心函数介绍
- 配置参数说明
高级功能
1. 动态仓位调整 - adjust_trade_position
adjust_trade_position 是 Freqtrade 提供的一个强大功能,用于动态调整现有交易的仓位大小。这个方法允许你在持仓期间根据市场条件进行加仓或减仓。
方法签名
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs) -> Optional[float]:参数详解
| 参数 | 类型 | 说明 |
|---|---|---|
trade | Trade | 当前交易对象,包含交易的所有信息 |
current_time | datetime | 当前时间 |
current_rate | float | 当前市场价格 |
current_profit | float | 当前盈亏比例(如 0.05 表示 5% 盈利) |
min_stake | Optional[float] | 最小投资金额限制 |
max_stake | float | 最大投资金额限制 |
current_entry_rate | float | 当前入场价格(用于加仓) |
current_exit_rate | float | 当前出场价格(用于减仓) |
current_entry_profit | float | 入场方向的盈亏 |
current_exit_profit | float | 出场方向的盈亏 |
**kwargs | dict | 其他额外参数 |
返回值
- 正数:加仓金额
- 负数:减仓金额
- None 或 0:不做调整
实际应用示例
1. 马丁格尔加仓策略
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
**kwargs) -> Optional[float]:
# 只在亏损时加仓
if current_profit > -0.02: # 亏损小于2%时不加仓
return None
# 计算已加仓次数
filled_orders = trade.select_filled_orders(trade.entry_side)
if len(filled_orders) >= 3: # 最多加仓3次
return None
# 计算加仓金额(递增)
stake_amount = trade.stake_amount * (len(filled_orders) + 1) * 0.5
# 检查资金限制
if stake_amount > max_stake:
stake_amount = max_stake
if min_stake and stake_amount < min_stake:
return None
return stake_amount2. 盈利加仓策略
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
**kwargs) -> Optional[float]:
# 只在盈利且趋势良好时加仓
if current_profit < 0.01: # 盈利小于1%时不加仓
return None
# 检查当前价格是否高于入场价格(上升趋势)
if current_rate <= trade.open_rate:
return None
# 计算加仓金额(固定比例)
stake_amount = trade.stake_amount * 0.3 # 每次加仓30%的原始仓位
# 资金限制检查
if stake_amount > max_stake:
stake_amount = max_stake
return stake_amount3. 动态减仓策略
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
**kwargs) -> Optional[float]:
# 盈利超过10%时减仓一半
if current_profit > 0.10:
return -trade.stake_amount * 0.5 # 减仓50%
# 亏损超过5%时减仓止损
if current_profit < -0.05:
return -trade.stake_amount # 全部平仓
return None4. 基于技术指标的仓位调整
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
**kwargs) -> Optional[float]:
# 获取当前技术指标
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
if dataframe.empty:
return None
latest_candle = dataframe.iloc[-1]
current_rsi = latest_candle['rsi']
# RSI 超卖时加仓
if current_rsi < 30 and current_profit < -0.01:
filled_orders = trade.select_filled_orders(trade.entry_side)
if len(filled_orders) < 2: # 最多加仓1次
return trade.stake_amount * 0.5
# RSI 超买时减仓
if current_rsi > 70 and current_profit > 0.05:
return -trade.stake_amount * 0.3 # 减仓30%
return None启用仓位调整功能
要在策略中启用仓位调整,需要在配置中添加:
class YourStrategy(IStrategy):
# 启用仓位调整
position_adjustment_enable = True
# 可选:设置最大仓位调整次数
max_entry_position_adjustment = 3注意事项
- 资金管理:确保加仓不会超过总资金限制
- 风险控制:设置最大加仓次数和金额限制
- 市场条件:根据市场趋势决定加减仓时机
- 手续费成本:频繁加仓会增加交易成本
- 回测验证:务必在历史数据上充分测试策略
常见应用场景
马丁格尔策略:亏损时加倍投入
金字塔加仓:盈利时逐步加仓
分批建仓:分多次建立仓位
动态止损:根据市场情况调整仓位
风险对冲:通过减仓降低风险
[ ] 动态止损
[ ] 自定义 ROI
[ ] 风险管理
日志打印方法
在策略开发中,日志打印是非常重要的调试和监控工具。Freqtrade 使用 Python 的 logging 模块来处理日志输出。
基本日志导入和配置
import logging
logger = logging.getLogger(__name__)常用日志打印方法
1. 策略启动时打印配置信息
def __init__(self, config: dict) -> None:
super().__init__(config)
# 打印策略配置
logger.info("=" * 60)
logger.info("[策略启动]")
logger.info(f" 策略名称: {self.__class__.__name__}")
logger.info(f" 时间周期: {self.timeframe}")
logger.info(f" 启用做空: {self.can_short}")
logger.info(f" 止损设置: {self.stoploss:.2%}")
logger.info("=" * 60)2. 分批建仓配置打印
def print_martingale_config(self):
"""打印分批建仓配置信息"""
logger.info("=" * 60)
logger.info("[分批建仓配置]")
logger.info(f" 加仓触发阈值: {self.martingale_trigger.value:.2%}")
logger.info(f" 最大加仓次数: {self.max_entry_positions.value}")
logger.info(f" 资金分配系数: {self.position_multiplier.value}")
logger.info(f" ROI目标: {self.roi_target.value:.2%}")
logger.info(f" 止损: {self.stoploss_value.value:.2%}")
logger.info("=" * 60)
logger.info("[资金分配说明]")
logger.info(f" 如果总预算为100U,分{self.max_entry_positions.value}次建仓:")
logger.info(f" - 初始开仓: {100/self.max_entry_positions.value:.2f}U")
logger.info(f" - 每次加仓: 根据剩余资金和系数动态计算")
logger.info(f" - 总投入: 100U (固定)")
logger.info("=" * 60)3. 交易信号打印
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# ... 策略逻辑 ...
# 打印入场信号(可选,用于调试)
entry_signals = dataframe[dataframe["enter_short"] == 1]
if len(entry_signals) > 0:
latest_signal = entry_signals.iloc[-1]
logger.info(f"[入场信号] {metadata['pair']}")
logger.info(f" 时间: {latest_signal.name}")
logger.info(f" 价格: {latest_signal['close']:.4f}")
logger.info(f" RSI: {latest_signal['rsi']:.2f}")
return dataframe4. 指标状态监控
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe["rsi"] = ta.RSI(dataframe)
# 定期打印指标状态(避免过多日志)
if len(dataframe) % 100 == 0: # 每100根K线打印一次
latest = dataframe.iloc[-1]
logger.debug(f"[指标状态] {metadata['pair']}")
logger.debug(f" RSI: {latest['rsi']:.2f}")
logger.debug(f" 价格: {latest['close']:.4f}")
return dataframe5. 风险管理日志
def custom_stoploss(self, pair: str, trade: 'Trade', current_time: datetime,
current_rate: float, current_profit: float, **kwargs) -> float:
# 打印止损状态
if current_profit < -0.02: # 亏损超过2%
logger.warning(f"[止损警告] {pair}")
logger.warning(f" 当前亏损: {current_profit:.2%}")
logger.warning(f" 当前价格: {current_rate:.4f}")
return self.stoploss6. 资金管理日志
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: Optional[float], max_stake: float,
leverage: float, entry_tag: Optional[str], side: str,
**kwargs) -> float:
# 打印资金分配
logger.info(f"[资金分配] {pair}")
logger.info(f" 建议仓位: {proposed_stake:.2f}")
logger.info(f" 杠杆倍数: {leverage:.1f}")
logger.info(f" 交易方向: {side}")
return proposed_stake日志级别说明
| 级别 | 用途 | 示例 |
|---|---|---|
logger.debug() | 调试信息,详细的数据流 | 指标计算过程 |
logger.info() | 一般信息,重要事件 | 交易信号、配置信息 |
logger.warning() | 警告信息,需要注意的情况 | 亏损警告、异常情况 |
logger.error() | 错误信息,严重问题 | 交易失败、数据错误 |
实际应用示例
class AdvancedRSI(IStrategy):
def __init__(self, config: dict) -> None:
super().__init__(config)
# 启动时打印配置
self.print_strategy_config()
def print_strategy_config(self):
"""打印策略完整配置"""
logger.info("=" * 60)
logger.info("[策略配置加载完成]")
logger.info(f" 策略类: {self.__class__.__name__}")
logger.info(f" 时间框架: {self.timeframe}")
logger.info(f" RSI周期: {self.rsi_period.value}")
logger.info(f" 超买线: {self.rsi_overbought.value}")
logger.info(f" 超卖线: {self.rsi_oversold.value}")
logger.info(f" 止损: {self.stoploss:.2%}")
logger.info(f" 最小ROI: {self.minimal_roi}")
logger.info("=" * 60)
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 策略逻辑...
# 检测并记录入场信号
signals = dataframe[dataframe["enter_long"] == 1]
if not signals.empty:
for idx, signal in signals.iterrows():
logger.info(f"[做多信号] {metadata['pair']} @ {signal['close']:.4f}")
logger.info(f" RSI: {signal['rsi']:.2f}")
logger.info(f" 时间: {idx}")
return dataframe日志配置建议
在 config.json 中配置日志级别:
{
"user_data_dir": "./user_data",
"strategy": "YourStrategy",
"log_level": "INFO",
"logfile": "user_data/logs/strategy.log"
}日志最佳实践:
- 使用有意义的日志消息
- 避免在生产环境中打印过多 DEBUG 信息
- 重要交易事件一定要记录日志
- 定期清理日志文件避免过大
实用示例
- [x] 日志打印方法
- [ ] 简单策略模板
- [ ] 常用代码片段