# ptrade加AI **Repository Path**: ooooinfo/ptrade-plus ## Basic Information - **Project Name**: ptrade加AI - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-01-31 - **Last Updated**: 2026-01-31 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # PTrade 常用方法 #### 介绍 PTrade 量化策略与流程说明仓库,包含中小市值选股、涨跌停过滤、科创/北交过滤、最小市值排序等常用方法,以及流程说明 HTML 文档。 #### 常用 PTrade 方法 以下代码均**区分环境**:`is_trade()` 为 True 表示正式/实盘,用 `get_snapshot` 取当前价;为 False 表示回测,用 `get_price`。初始化时建议:`if not is_trade(): set_backtest()`。 #### 设置回测条件 回测环境下在 `initialize` 中调用 `set_backtest()`,用于设置佣金、滑点和涨跌停模式。 ```python def set_backtest(): # 设置佣金费率,最低 5 元 set_commission(commission_ratio=0.0001, min_commission=5.0, type="STOCK") # 设置固定滑点:0.03% set_fixed_slippage(fixedslippage=0.0003) set_limit_mode("UNLIMITED") ``` 调用示例:`if not is_trade(): set_backtest()`。 --- ### 一、涨停 / 跌停 **1. 获取涨跌停股票(区分环境:正式用 get_snapshot 当前价,回测用 get_price)** 本函数已做**正式环境 / 回测环境**区分:昨日收盘两环境均用 `get_price` 取历史;**当前价**在正式环境用 `get_snapshot`(实时价),回测用 `get_price` 最近一根 K 线。 ```python def get_limit_stock(context, stock_list): """获取涨跌停股票信息,返回 {'up_limit': [], 'down_limit': []}。当前价:正式环境 get_snapshot,回测 get_price。""" if not stock_list: return {'up_limit': [], 'down_limit': []} today_date = context.current_dt.strftime("%Y%m%d") st_status = get_stock_status(stock_list, 'ST') out_info = {'up_limit': [], 'down_limit': []} def get_limit_rate(stock, ST_flag=True, date=None): rate = 0.1 if stock[:2] == '68': rate = 0.2 elif stock[0] == '3' and date >= '20200824': rate = 0.2 elif stock[0] != '3' and stock[:2] != '68' and ST_flag: rate = 0.05 return rate converted_stocks, stock_mapping = [], {} for stock in stock_list: c = stock.replace('.SZ', '.XSHE').replace('.SS', '.XSHG') if '.' in stock else stock converted_stocks.append(c) stock_mapping[c] = stock # 昨日收盘:正式/回测均用 get_price 历史数据 history_data = get_price(converted_stocks, count=2, frequency='1d', fields=['close', 'volume']) if history_data is None or history_data.empty: return out_info for converted_stock, original_stock in stock_mapping.items(): try: sp = history_data[history_data.index.get_level_values('code') == converted_stock] if len(sp) < 2: continue valid = sp[sp['volume'] > 0] if len(valid) < 2: continue last_close = float(valid['close'].iloc[-2]) # 当前价:正式环境用 get_snapshot(实时),回测用 get_price 最近价 if is_trade(): try: snapshot = get_snapshot(original_stock) if snapshot and original_stock in snapshot: curr_price = float(snapshot[original_stock]['last_px']) else: curr_price = float(valid['close'].iloc[-1]) except Exception: curr_price = float(valid['close'].iloc[-1]) else: curr_price = float(valid['close'].iloc[-1]) ST_flag = st_status.get(original_stock, False) rate = get_limit_rate(original_stock, ST_flag, today_date) up_limit_price = round(last_close * (1 + rate), 2) down_limit_price = round(last_close * (1 - rate), 2) if curr_price >= up_limit_price: out_info['up_limit'].append(original_stock) if curr_price <= down_limit_price: out_info['down_limit'].append(original_stock) except Exception as e: continue return out_info ``` **2. 过滤涨停股票(持仓保留)** ```python def filter_limitup_stock(context, stock_list): """过滤涨停的股票,持仓中的股票保留""" try: limit_info = get_limit_stock(context, stock_list) up_limit_stocks = set(limit_info.get('up_limit', [])) filtered = [] for stock in stock_list: if stock in context.portfolio.positions or stock not in up_limit_stocks: filtered.append(stock) return filtered except Exception as e: return stock_list ``` **3. 过滤跌停股票(持仓保留)** ```python def filter_limitdown_stock(context, stock_list): """过滤跌停的股票,持仓中的股票保留""" try: limit_info = get_limit_stock(context, stock_list) down_limit_stocks = set(limit_info.get('down_limit', [])) filtered = [] for stock in stock_list: if stock in context.portfolio.positions or stock not in down_limit_stocks: filtered.append(stock) return filtered except Exception as e: return stock_list ``` --- ### 二、科创 / 北交 / 创业板过滤 ```python def filter_kcbj_stock(stock_list): """过滤科创板、北交所、创业板股票 规则:4/8 开头北交所,68 开头科创板,30 开头创业板 """ filtered = [] for stock in stock_list: code = stock.split('.')[0] if '.' in stock else stock if code[0] in ('4', '8') or code[:2] in ('68', '30'): continue filtered.append(stock) return filtered ``` --- ### 三、行业选股公用方法(来自 143_hj_wx) 根据行业近期表现识别强势/弱势行业,从候选股票中优先选强势行业、可补充中性行业。依赖 `get_stock_blocks(stock)` 取行业(键 `'HY'`,值为 `[(code, industry_name), ...]`),无该 API 时需替换为本地接口。 **开关**:`g.industry_selection = True` 启用行业选择;`g.dynamic_industry_selection = True` 为动态强势行业选股,`False` 为简单行业分散(最多 10 个行业)。 **1. 计算各行业近期收益率** ```python def calculate_industry_performance(context, stock_list, lookback_days=20): """计算各行业近期平均收益率,返回 {行业名: 平均收益率}。依赖 get_stock_blocks(stock)['HY']。""" industry_returns = {} end_date = context.previous_date.strftime('%Y-%m-%d') if hasattr(context.previous_date, 'strftime') else str(context.previous_date) for stock in stock_list: try: industry_info = get_stock_blocks(stock) if not industry_info or 'HY' not in industry_info or not industry_info['HY']: continue industry_name = industry_info['HY'][0][1] price_data = get_price(stock, end_date=end_date, frequency='1d', fields=['close'], count=lookback_days+1) if price_data is None or not isinstance(price_data, pd.DataFrame) or len(price_data) < 2: continue start_price = price_data['close'].iloc[0] end_price = price_data['close'].iloc[-1] stock_return = (end_price - start_price) / start_price industry_returns.setdefault(industry_name, []).append(stock_return) except Exception: continue return {ind: np.mean(ret) for ind, ret in industry_returns.items() if ret} ``` **2. 分类强势 / 弱势行业** ```python def classify_industries(industry_performance, top_ratio=0.3, bottom_ratio=0.3): """按收益率排序,取前 top_ratio 为强势行业、后 bottom_ratio 为弱势行业。返回 (strong_list, weak_list)。""" if not industry_performance: return [], [] sorted_industries = sorted(industry_performance.items(), key=lambda x: x[1], reverse=True) n = len(sorted_industries) top_n = max(1, int(n * top_ratio)) bottom_n = max(1, int(n * bottom_ratio)) strong = [ind for ind, _ in sorted_industries[:top_n]] weak = [ind for ind, _ in sorted_industries[-bottom_n:]] return strong, weak ``` **3. 动态行业选股(优先强势行业,不足补中性/弱势)** ```python def get_stock_industry(context, stock_list, min_select_multiple=2): """ 从 stock_list 中按行业表现选股:先选强势行业股票,不足则补中性行业,再不足补弱势行业。 min_select_multiple: 最少选取数量为 g.stock_num * min_select_multiple,默认 2。 """ if not stock_list: return [] stock_num = getattr(g, 'stock_num', 6) min_select = stock_num * min_select_multiple industry_performance = calculate_industry_performance(context, stock_list, lookback_days=20) if not industry_performance: return get_stock_industry_simple(context, stock_list, max_industries=10) strong_industries, weak_industries = classify_industries(industry_performance, top_ratio=0.3, bottom_ratio=0.3) if not strong_industries: return get_stock_industry_simple(context, stock_list, max_industries=10) selected = [] for stock in stock_list: try: industry_info = get_stock_blocks(stock) if not industry_info or 'HY' not in industry_info or not industry_info['HY']: continue industry_name = industry_info['HY'][0][1] if industry_name in strong_industries: selected.append(stock) elif industry_name in weak_industries: pass # 排除弱势 except Exception: continue # 不足则补中性行业 if len(selected) < min_select: for stock in stock_list: if stock in selected: continue try: industry_info = get_stock_blocks(stock) if industry_info and 'HY' in industry_info and industry_info['HY']: industry_name = industry_info['HY'][0][1] if industry_name not in strong_industries and industry_name not in weak_industries: selected.append(stock) if len(selected) >= min_select: break except Exception: continue # 仍不足则补弱势行业 if len(selected) < min_select: for stock in stock_list: if stock in selected: continue try: industry_info = get_stock_blocks(stock) if industry_info and 'HY' in industry_info and industry_info['HY']: if industry_info['HY'][0][1] in weak_industries: selected.append(stock) if len(selected) >= min_select: break except Exception: continue return selected if selected else stock_list[:10] ``` **4. 简单行业分散(最多 N 个不同行业各选 1 只)** ```python def get_stock_industry_simple(context, stock_list, max_industries=10): """从 stock_list 中按行业分散选股,最多选 max_industries 个不同行业,每行业取 1 只。""" if not stock_list: return [] selected = [] industry_list = [] for stock in stock_list: try: industry_info = get_stock_blocks(stock) if not industry_info or 'HY' not in industry_info or not industry_info['HY']: continue industry_name = industry_info['HY'][0][1] if industry_name not in industry_list: industry_list.append(industry_name) selected.append(stock) if len(industry_list) >= max_industries: break except Exception: continue return selected if selected else stock_list[:10] ``` **调用示例** ```python # initialize 中 g.industry_selection = True g.dynamic_industry_selection = True # 动态强势行业;False 则简单分散 g.stock_num = 6 # before_trading 或选股流程中:先得到市值排序后的 stock_list,再按行业筛选 stock_list = [...] # 如市值排序前 100 只 if getattr(g, 'industry_selection', False): if getattr(g, 'dynamic_industry_selection', True): stock_list = get_stock_industry(context, stock_list) else: stock_list = get_stock_industry_simple(context, stock_list, max_industries=10) # 再取前 g.stock_num 或所需只数 ``` --- ### 四、最小市值排序(流通市值升序) **方式一:基于 get_fundamentals 估值表,再按当前价算流通市值并排序(区分环境取价)** ```python # 1) 获取估值数据(需先有 g.stock_list),得到 g.df2,含 float_value, total_value, a_floats 等 # 2) 当前价:正式环境用 get_snapshot,回测用 get_price;curr_float_value = a_floats * 当前价 # 3) 按 curr_float_value 升序排序,取前 N 只 g.df2['curr_float_value'] = 0 for converted_stock, original_stock in stock_mapping.items(): if is_trade(): try: snapshot = get_snapshot(original_stock) current_price = float(snapshot[original_stock]['last_px']) if snapshot and original_stock in snapshot else None except Exception: current_price = None else: price_data = get_price(converted_stock, count=1, frequency='1d', fields=['close']) current_price = float(price_data['close'][-1]) if price_data is not None and len(price_data) > 0 else None if current_price is not None and current_price > 0: g.df2.loc[original_stock, 'curr_float_value'] = g.df2.loc[original_stock, 'a_floats'] * current_price g.df2 = g.df2[g.df2['curr_float_value'] != 0] df3 = g.df2.sort_values(by='curr_float_value') stocks = list(df3.head(g.screen_stock_count).index) ``` **方式二:直接取 valuation 按 float_value / total_value 排序(before_trading 日级)** ```python df = get_fundamentals( g.stock_list, "valuation", fields=["total_value", "a_floats", "float_value"], date=context.previous_date ).sort_values(by="float_value").head(100) stocks = list(df.head(g.screen_stock_count).index) ``` **方式三:获取流通市值 DataFrame 后排序取前 N** ```python def get_market_cap(context, stocks): """获取总市值、流通市值、总股本,用于最小市值排序""" df_valuation = pd.DataFrame() for count in range(1, 11): try: last_trade_day = str(get_trading_day(-1)).replace('-', '') df_valuation = get_fundamentals( stocks, 'valuation', fields=['total_value', 'float_value', 'total_shares'], date=last_trade_day ) if not df_valuation.empty: break except Exception: time.sleep(5) return df_valuation # 使用:按 total_value 或 float_value 升序,取前 100 只 df_valuation = get_market_cap(context, initial_list) df_valuation = df_valuation.sort_values(by=['total_value'], ascending=True) stock_list = df_valuation.index.tolist()[:100] ``` --- ### 五、公用方法 **1. 过滤停牌** ```python def filter_paused_stock(stock_list): """过滤停牌股票""" halt_status = get_stock_status(stock_list, 'HALT') return [i for i in stock_list if halt_status.get(i, True) is not True] ``` **2. 过滤 ST** ```python def filter_st_stock(stock_list): """过滤 ST 及退市标签股票""" st_status = get_stock_status(stock_list, 'ST') return [i for i in stock_list if st_status.get(i, False) is not True] ``` **3. 过滤次新股(上市不足 N 天)** ```python def filter_new_stock(context, stock_list, min_days=375): """过滤次新股,默认上市不足 375 天剔除""" yesterday = context.previous_date filtered = [] stock_infos = get_stock_info(stock_list, ['listed_date']) for key, value in stock_infos.items(): listed_date = value.get('listed_date') if listed_date is None: filtered.append(key) continue try: list_date = datetime.strptime(listed_date, '%Y-%m-%d').date() if (yesterday - list_date).days > min_days: filtered.append(key) except Exception: filtered.append(key) return filtered ``` **4. 价格四舍五入(用于涨跌停价比较)** ```python from decimal import Decimal def replace(x): """保留两位小数,用于涨跌停价计算""" return float(str(round(x, 2))) ``` **5. 调试日志(按 g.debug_mode 开关)** ```python def debug_log(message, level="info"): """根据 g.debug_mode 控制日志输出""" if getattr(g, 'debug_mode', False): if level == "info": log.info(message) elif level == "warning": log.warning(message) elif level == "error": log.error(message) ``` **5.1 日志开关(按 g.enable_log 开关,多级封装)** 在 `initialize` 中设置 `g.enable_log = True` 或 `False`,以下包装函数根据该开关决定是否输出对应级别日志;关闭时不再调用底层 `log.info` / `log.warning` 等。 ```python def log_info(msg): """根据 g.enable_log 决定是否输出 info 日志""" if getattr(g, 'enable_log', True): log.info(msg) def log_warning(msg): """根据 g.enable_log 决定是否输出 warning 日志""" if getattr(g, 'enable_log', True): log.warning(msg) def log_error(msg): """根据 g.enable_log 决定是否输出 error 日志""" if getattr(g, 'enable_log', True): log.error(msg) def log_critical(msg): """根据 g.enable_log 决定是否输出 critical 日志""" if getattr(g, 'enable_log', True): log.critical(msg) def log_debug(msg): """根据 g.enable_log 决定是否输出 debug 日志""" if getattr(g, 'enable_log', True): log.debug(msg) ``` 调用示例:策略中统一用 `log_info("xxx")`、`log_warning("xxx")` 等,通过 `g.enable_log` 一键开关日志。 ```python # initialize 中 g.enable_log = True # 开启日志;False 关闭 # 策略中调用 log_info("选股完成,候选数量: {}".format(len(stock_list))) log_warning("获取行情失败,跳过") log_error("下单失败: {}".format(str(e))) ``` **6. 检查股票是否可买** 说明:有一个**记录记录当天卖出的股票**(如 `g.sold_stocks_dates` 或 `g.sold_today`)。查询时:先看该股是否在记录中且**当前仍有持仓**,若有持仓则应**先卖出**该股,然后**等待 10 秒再执行一次后续逻辑**(再次运算)。 ```python # 当天卖出记录:在卖出时写入,如 g.sold_stocks_dates[stock] = context.current_dt.strftime("%Y-%m-%d") # 或 g.sold_today = set() # 当日卖出股票集合,卖出时 g.sold_today.add(stock) def can_buy_stock(context, stock): """检查是否可买入:若在 g.sold_stocks_dates 中,则按卖出日期判断是否可买""" if stock not in getattr(g, 'sold_stocks_dates', {}): return True, "无卖出记录,可以买入" current_date = context.current_dt.strftime("%Y-%m-%d") sell_date = g.sold_stocks_dates[stock] current_date_obj = datetime.strptime(current_date, "%Y-%m-%d") sell_date_obj = datetime.strptime(sell_date, "%Y-%m-%d") if (current_date_obj - sell_date_obj).days >= 0: del g.sold_stocks_dates[stock] return True, "可以重新买入" return False, "卖出后暂不买入" def check_sold_today_and_sell_then_retry(context, sold_today_key='sold_stocks_dates', retry_seconds=10): """ 查当天卖出记录:若某股在记录中且当前仍有持仓,则先卖出该股,等待 retry_seconds 秒后由调用方再次运算。 需在 initialize 中维护 sold_today 记录(卖出时写入),如 g.sold_stocks_dates[stock] = 当日日期。 """ import time sold = getattr(g, sold_today_key, {}) if not sold: return current_date = context.current_dt.strftime("%Y-%m-%d") to_sell = [] for stock, sell_date in list(sold.items()): if sell_date != current_date: continue pos = get_position(stock) if pos and pos.amount > 0: to_sell.append(stock) for stock in to_sell: order_target_value(stock, 0) # 或 order_sell_with_offset(context, stock, get_position(stock).amount, ...) if to_sell and retry_seconds > 0: time.sleep(retry_seconds) # 此处返回后,调用方应再次执行本段逻辑或主流程(再次运算) ``` 使用说明:在定时任务或主流程中先调用 `check_sold_today_and_sell_then_retry(context)`;若其中有卖出,会先下单并等待 10 秒,返回后**再执行一次本段逻辑或主流程**(再次运算),确保记录与持仓一致。 **7. 国九条基本面筛选(公用方法)** 依据“国九条”相关要求做基本面过滤:总市值区间、EPS>0、归母净利润>0、净利润>0、营业收入下限。参数可配置,便于在多策略中复用。 ```python def filter_by_guojiutiao(context, stock_list, min_mv=10e8, max_mv=500e8, operating_revenue=1e8): """ 国九条基本面筛选(公用方法) 条件:总市值 in (min_mv, max_mv)、eps>0、归母净利润>0、净利润>0、营业收入>operating_revenue 参数: context: 策略上下文 stock_list: 待筛选股票列表 min_mv: 最小总市值(元),默认 10 亿 max_mv: 最大总市值(元),默认 500 亿 operating_revenue: 营业收入下限(元),默认 1 亿 返回: 筛选后的股票代码列表(按总市值升序,小市值在前) """ if not stock_list: return [] try: df_all = pd.DataFrame() last_trade_day = str(context.previous_date).replace('-', '') for count in range(1, 11): try: df_valuation = get_fundamentals( stock_list, 'valuation', fields=['total_value', 'float_value', 'total_shares'], date=last_trade_day ) df_income = get_fundamentals( stock_list, 'income_statement', fields=['basic_eps', 'np_parent_company_owners', 'net_profit', 'operating_revenue'], date=last_trade_day ) df_eps = get_fundamentals(stock_list, 'eps', fields=['eps'], date=last_trade_day) if df_valuation is not None and not df_valuation.empty: df_all = df_valuation.copy() if df_income is not None and not df_income.empty: df_all = pd.merge(df_all, df_income, on='secu_code', how='inner') if df_eps is not None and not df_eps.empty: df_all = pd.merge(df_all, df_eps, on='secu_code', how='inner') if not df_all.empty: break except Exception as e: time.sleep(1) continue if df_all.empty: return [] required_cols = ['total_value', 'eps', 'np_parent_company_owners', 'net_profit', 'operating_revenue'] missing = [c for c in required_cols if c not in df_all.columns] if missing: return [] df_all = df_all[ (df_all['total_value'] > min_mv) & (df_all['total_value'] < max_mv) & (df_all['eps'] > 0) & (df_all['np_parent_company_owners'] > 0) & (df_all['net_profit'] > 0) & (df_all['operating_revenue'] > operating_revenue) ].dropna(subset=required_cols) if df_all.empty: return [] df_all = df_all.sort_values(by='total_value', ascending=True) return list(df_all.index) except Exception as e: log.warning('国九条筛选异常: {}'.format(str(e))) return [] ``` 使用示例(在 `before_trading_start` 中,国九条在市值排序前或后均可): ```python # initialize 中设置 g.enable_guojiutiao_filter = True g.min_mv = 10e8 g.max_mv = 500e8 g.operating_revenue = 1e8 # before_trading 中调用(传入 g 参数或自定义) if g.enable_guojiutiao_filter: stock_list = filter_by_guojiutiao( context, stock_list, min_mv=g.min_mv, max_mv=g.max_mv, operating_revenue=g.operating_revenue ) ``` --- ### 六、补仓功能(按只数均分余额,最后一只全余额) 参数 `top_up_count` 表示补仓只数(如 5 表示 5 只)。需要补仓时:前 N-1 只按「可用资金/只数」均分目标市值,最后一只用**当前剩余可用资金**全仓买入,避免尾数浪费。 **执行时间**:建议在 **14:30** 或 **14:50** 进行补仓(可用 `run_daily(context, your_top_up_func, time='14:30')` 或 `time='14:50'` 定时触发)。 **参数说明** - `stock_list`:待补仓的股票列表(长度应等于补仓只数,如 5 只)。 - `top_up_count`:补仓只数,如 5。 - `cash_ratio`:可用资金比例,默认 1.0;如 0.95 表示预留 5% 缓冲。 ```python def top_up_by_equal_cash(context, stock_list, top_up_count=5, cash_ratio=1.0): """ 补仓:按只数均分可用余额,前 N-1 只每只目标市值 = 可用资金/只数, 最后一只用剩余全部可用资金买入。 """ if not stock_list or len(stock_list) == 0: return n = min(top_up_count, len(stock_list)) stocks = stock_list[:n] available_cash = context.portfolio.cash * cash_ratio if available_cash <= 0: return per_value = available_cash / n # 前 N-1 只每只目标金额(均份) for i, stock in enumerate(stocks): if i < n - 1: # 前 N-1 只:按市值均份余额 order_target_value(stock, per_value) else: # 最后一只:全余额买入(用当前剩余现金) remaining_cash = context.portfolio.cash if remaining_cash > 0: order_target_value(stock, remaining_cash) ``` **使用示例** ```python # 在 initialize 中设置补仓只数,并注册 14:30 或 14:50 执行补仓 g.top_up_count = 5 run_daily(context, do_top_up, time='14:30') # 或 time='14:50' def do_top_up(context): # 先得到待补仓的 5 只股票(如当前持仓或候选列表前 5 只) stocks_to_top_up = [...] # 长度为 5 的列表 top_up_by_equal_cash(context, stocks_to_top_up, top_up_count=g.top_up_count, cash_ratio=0.95) ``` 注意:最后一只用 `context.portfolio.cash` 是在前 N-1 笔委托发出后的剩余现金;若前单未成交,实际剩余可能略多,最后一笔会尽量把剩余资金用尽。 --- ### 七、买入/卖出公用方法(区分环境取价 + 买入加几分、卖出减几分) - **区分环境取价**:当前价用 `get_current_price(context, stock)`(正式环境 get_snapshot,回测 get_price)。 - **买入加几分**:限价 = 当前价 + `add_fen * 0.01`(单位:分,如 add_fen=2 表示加 2 分)。 - **卖出减几分**:限价 = 当前价 - `sub_fen * 0.01`(如 sub_fen=2 表示减 2 分)。 **1. 买入限价、卖出限价(仅算价格,不下单)** ```python def get_limit_price_buy(context, stock, add_fen=2): """买入限价 = 当前价 + add_fen 分(区分环境取价)。返回 float 或 None。""" price = get_current_price(context, stock) if price is None or price <= 0: return None return round(price + add_fen * 0.01, 2) def get_limit_price_sell(context, stock, sub_fen=2): """卖出限价 = 当前价 - sub_fen 分(区分环境取价)。返回 float 或 None。""" price = get_current_price(context, stock) if price is None or price <= 0: return None return round(price - sub_fen * 0.01, 2) ``` **2. 按股数买入/卖出(区分环境取价 + 加几分/减几分后下单)** ```python def order_buy_with_offset(context, stock, amount, add_fen=2): """ 按股数买入:区分环境取价,限价 = 当前价 + add_fen 分,再下单。 amount: 股数(整百)。回测可用 order;正式环境可用 order_market(sid, vol, mtype, limit_p)。 """ if amount <= 0: return limit_p = get_limit_price_buy(context, stock, add_fen=add_fen) if limit_p is None: return if is_trade(): mtype = 1 if stock.startswith('6') or stock.startswith('5') else 0 order_market(stock, amount, mtype, limit_p) # 或 margin_trade(stock, amount, limit_p, mtype) else: order(stock, amount) def order_sell_with_offset(context, stock, amount, sub_fen=2): """ 按股数卖出:区分环境取价,限价 = 当前价 - sub_fen 分,再下单。 """ if amount <= 0: return limit_p = get_limit_price_sell(context, stock, sub_fen=sub_fen) if limit_p is None: return if is_trade(): mtype = 1 if stock.startswith('6') or stock.startswith('5') else 0 order_market(stock, -amount, mtype, limit_p) else: order(stock, -amount) ``` **3. 按金额买入(先算股数再按加几分限价下单)** ```python def order_buy_value_with_offset(context, stock, value, add_fen=2): """ 按金额买入:当前价取价(区分环境),限价 = 当前价 + add_fen 分,股数 = value/limit_p 整百。 """ limit_p = get_limit_price_buy(context, stock, add_fen=add_fen) if limit_p is None or limit_p <= 0 or value <= 0: return amount = int(value / limit_p / 100) * 100 if amount <= 0: return order_buy_with_offset(context, stock, amount, add_fen=add_fen) ``` **使用示例** ```python # initialize 中可配置 g.buy_add_fen = 2 # 买入加 2 分 g.sell_sub_fen = 2 # 卖出减 2 分 # 买入 order_buy_with_offset(context, stock, 200, add_fen=getattr(g, 'buy_add_fen', 2)) # 或按金额 order_buy_value_with_offset(context, stock, 5000, add_fen=getattr(g, 'buy_add_fen', 2)) # 卖出 pos = get_position(stock) if pos and pos.amount > 0: order_sell_with_offset(context, stock, pos.amount, sub_fen=getattr(g, 'sell_sub_fen', 2)) ``` **4. 分批买入、分批卖出(避免单笔数量过大)** 将总股数拆成多批,每批不超过 `batch_size`(股数整百),批与批之间可加 `batch_delay` 秒(正式环境建议加,回测可设为 0)。 ```python import time def order_buy_with_offset_batch(context, stock, total_amount, add_fen=2, batch_size=1000, batch_delay=0.5): """ 分批买入:总股数 total_amount 拆成每批最多 batch_size 股(整百), 每批用 order_buy_with_offset,正式环境可加 batch_delay 秒间隔。 """ if total_amount <= 0 or batch_size <= 0: return total_amount = int(total_amount / 100) * 100 if total_amount <= 0: return batch_size = int(batch_size / 100) * 100 if batch_size <= 0: batch_size = 100 sent = 0 while sent < total_amount: batch = min(batch_size, total_amount - sent) order_buy_with_offset(context, stock, batch, add_fen=add_fen) sent += batch if sent < total_amount and batch_delay > 0: time.sleep(batch_delay) def order_sell_with_offset_batch(context, stock, total_amount, sub_fen=2, batch_size=1000, batch_delay=0.5): """ 分批卖出:总股数拆成每批最多 batch_size 股(整百), 每批用 order_sell_with_offset,正式环境可加 batch_delay 秒间隔。 """ if total_amount <= 0 or batch_size <= 0: return total_amount = int(total_amount / 100) * 100 if total_amount <= 0: return batch_size = int(batch_size / 100) * 100 if batch_size <= 0: batch_size = 100 sent = 0 while sent < total_amount: batch = min(batch_size, total_amount - sent) order_sell_with_offset(context, stock, batch, sub_fen=sub_fen) sent += batch if sent < total_amount and batch_delay > 0: time.sleep(batch_delay) ``` **分批使用示例** ```python # initialize 中可配置 g.batch_size = 1000 # 每批最多 1000 股 g.batch_delay = 0.5 # 正式环境每批间隔 0.5 秒,回测可设为 0 # 大批量买入(如 5000 股分 5 批,每批 1000) order_buy_with_offset_batch(context, stock, 5000, add_fen=2, batch_size=1000, batch_delay=0.5) # 大批量卖出(持仓全部按批卖出) pos = get_position(stock) if pos and pos.amount > 0: order_sell_with_offset_batch(context, stock, pos.amount, sub_fen=2, batch_size=1000, batch_delay=0.5) ``` --- ### 八、回测与正式环境逻辑(正式环境用 get_snapshot) - **区分环境**:用 `is_trade()` 判断。`True` 表示正式/实盘环境,`False` 表示回测环境。 - **取价逻辑**:回测用 `get_price`(历史/模拟数据);正式环境用 `get_snapshot`(实时行情),避免在实盘用回测接口导致数据不一致。 **公用方法:按环境取当前价(单只)** ```python def get_current_price(context, stock): """ 按环境取当前价:正式环境用 get_snapshot,回测用 get_price。 返回 float 或 None(取不到时)。 """ if is_trade(): try: snapshot = get_snapshot(stock) if snapshot and stock in snapshot: return float(snapshot[stock]['last_px']) except Exception as e: log.warning("get_snapshot({}) 失败: {}".format(stock, str(e))) return None else: try: price_data = get_price(stock, count=1, frequency='1m', fields=['close']) if price_data is not None and not price_data.empty and len(price_data) > 0: return float(price_data['close'].iloc[-1]) price_data = get_price(stock, count=1, frequency='1d', fields=['close']) if price_data is not None and not price_data.empty and len(price_data) > 0: return float(price_data['close'].iloc[-1]) except Exception as e: log.warning("get_price({}) 回测取价失败: {}".format(stock, str(e))) return None ``` **公用方法:按环境取多只股票当前价** ```python def get_current_prices(context, stock_list): """ 按环境取多只当前价:正式环境用 get_snapshot,回测用 get_price。 返回 dict: {stock: price},取不到的股票不写入或为 None。 """ if not stock_list: return {} if is_trade(): try: snapshot = get_snapshot(stock_list) if snapshot: return {s: float(snapshot[s]['last_px']) for s in stock_list if s in snapshot and snapshot[s].get('last_px') is not None} except Exception as e: log.warning("get_snapshot 多只取价失败: {}".format(str(e))) return {} else: out = {} try: price_df = get_price(stock_list, count=1, frequency='1m', fields=['close']) if price_df is not None and not price_df.empty: if hasattr(price_df.index, 'levels'): # MultiIndex (code, time) for stock in stock_list: try: row = price_df.xs(stock, level='code') if row is not None and len(row) > 0: out[stock] = float(row['close'].iloc[-1]) except Exception: pass else: for stock in stock_list: try: if stock in price_df.index: out[stock] = float(price_df.loc[stock, 'close'].iloc[-1]) except Exception: pass except Exception as e: log.warning("get_price 回测多只取价失败: {}".format(str(e))) return out ``` **使用示例** ```python # 初始化时区分回测/正式(设置佣金等) if not is_trade(): set_backtest() # 回测条件 # 需要当前价时统一用公用方法 price = get_current_price(context, stock) if price and price > 0: order_target_value(stock, target_value) # 多只取价(正式环境一次 get_snapshot 即可) prices = get_current_prices(context, stock_list) ``` --- ### 九、典型调用顺序(before_trading / 选股) 1. **初始化**:`if not is_trade(): set_backtest()`(回测设置佣金等);需要当前价时用 `get_current_price` / `get_current_prices`(见第八节)。 2. 获取指数成分股或初始股票池 3. `filter_st_stock` → `filter_paused_stock` → `filter_kcbj_stock` → `filter_new_stock` 4. 获取市值数据并**按流通市值升序排序**(取价处区分环境),取前 N 只 5. 卖出前可 `filter_limitup_stock`(涨停不卖);买入前可剔除涨停、用 `filter_limitdown_stock` 剔除跌停 ---