找回密码
 立即注册
首页 业界区 安全 使用StockTV API对接印度金融市场数据全指南:K线、实时 ...

使用StockTV API对接印度金融市场数据全指南:K线、实时行情与IPO新股

要燥 2025-6-1 18:40:33
一、印度金融市场数据特点

印度作为全球增长最快的主要经济体之一,其金融市场具有以下显著特征:

  • 双交易所体系:国家证券交易所(NSE)和孟买证券交易所(BSE)
  • 高流动性品种:Nifty 50指数成分股、银行股等
  • 独特交易机制:T+2结算制度,上午9:15至下午3:30交易时间(IST)
  • 丰富IPO市场:2023年印度IPO数量位居全球前列
二、环境配置与基础对接

1. API密钥获取与配置
  1. # 配置StockTV API
  2. API_KEY = "your_api_key_here"  # 通过官网或客服获取
  3. BASE_URL = "https://api.stocktv.top"
  4. # 印度市场特定参数
  5. INDIA_COUNTRY_ID = 14  # 印度国家代码
  6. NSE_EXCHANGE_ID = 46   # NSE交易所代码
  7. BSE_EXCHANGE_ID = 74   # BSE交易所代码
复制代码
2. 安装必要库
  1. pip install requests websocket-client pandas plotly
复制代码
三、印度K线数据专业对接

1. 多周期K线获取接口
  1. import pandas as pd
  2. def get_india_kline(symbol, exchange, interval="15m"):
  3.     """
  4.     获取印度股票K线数据
  5.     :param symbol: 股票代码(如RELIANCE)
  6.     :param exchange: 交易所(NSE/BSE)
  7.     :param interval: 时间间隔(1m/5m/15m/1h/1d)
  8.     """
  9.     url = f"{BASE_URL}/stock/kline"
  10.     params = {
  11.         "symbol": symbol,
  12.         "exchange": exchange,
  13.         "interval": interval,
  14.         "countryId": INDIA_COUNTRY_ID,
  15.         "key": API_KEY
  16.     }
  17.     response = requests.get(url, params=params)
  18.     data = response.json()
  19.    
  20.     # 转换为Pandas DataFrame
  21.     df = pd.DataFrame(data['data'])
  22.     df['time'] = pd.to_datetime(df['time'], unit='ms')  # 转换印度时区(IST)
  23.     df['time'] = df['time'].dt.tz_localize('UTC').dt.tz_convert('Asia/Kolkata')
  24.     return df
  25. # 获取Reliance Industries的15分钟K线(NSE)
  26. reliance_kline = get_india_kline("RELIANCE", "NSE", "15m")
复制代码
2. 专业级K线可视化
  1. import plotly.graph_objects as go
  2. def plot_advanced_kline(df):
  3.     fig = go.Figure(data=[go.Candlestick(
  4.         x=df['time'],
  5.         open=df['open'],
  6.         high=df['high'],
  7.         low=df['low'],
  8.         close=df['close'],
  9.         increasing_line_color='green',
  10.         decreasing_line_color='red'
  11.     )])
  12.    
  13.     fig.update_layout(
  14.         title='印度股票K线图',
  15.         xaxis_title='印度标准时间(IST)',
  16.         yaxis_title='价格(INR)',
  17.         xaxis_rangeslider_visible=False,
  18.         template="plotly_dark"
  19.     )
  20.    
  21.     # 添加成交量柱状图
  22.     fig.add_trace(go.Bar(
  23.         x=df['time'],
  24.         y=df['volume'],
  25.         name='成交量',
  26.         marker_color='rgba(100, 100, 255, 0.6)',
  27.         yaxis='y2'
  28.     ))
  29.    
  30.     fig.update_layout(yaxis2=dict(
  31.         title='成交量',
  32.         overlaying='y',
  33.         side='right'
  34.     ))
  35.    
  36.     fig.show()
  37. plot_advanced_kline(reliance_kline)
复制代码
四、印度市场实时数据对接

1. WebSocket实时行情订阅
  1. import websocket
  2. import json
  3. import threading
  4. class IndiaMarketData:
  5.     def __init__(self):
  6.         self.symbol_map = {}  # 存储symbol到股票名称的映射
  7.    
  8.     def on_message(self, ws, message):
  9.         data = json.loads(message)
  10.         
  11.         # 处理实时行情更新
  12.         if data.get('type') == 'stock':
  13.             symbol = data['symbol']
  14.             print(f"实时行情 {self.symbol_map.get(symbol, symbol)}: "
  15.                   f"最新价 {data['last']} 成交量 {data['volume']}")
  16.             
  17.         # 处理指数更新
  18.         elif data.get('type') == 'index':
  19.             print(f"指数更新 {data['name']}: {data['last']} ({data['chgPct']}%)")
  20.    
  21.     def subscribe_symbols(self, ws):
  22.         # 订阅Nifty 50成分股(示例)
  23.         nifty_stocks = ["RELIANCE", "TCS", "HDFCBANK", "INFY"]
  24.         for symbol in nifty_stocks:
  25.             self.symbol_map[symbol] = get_stock_name(symbol)
  26.         
  27.         # 订阅请求
  28.         subscribe_msg = {
  29.             "action": "subscribe",
  30.             "countryId": INDIA_COUNTRY_ID,
  31.             "symbols": nifty_stocks,
  32.             "indices": ["NSEI"]  # Nifty 50指数
  33.         }
  34.         ws.send(json.dumps(subscribe_msg))
  35.    
  36.     def start(self):
  37.         ws = websocket.WebSocketApp(
  38.             f"wss://ws-api.stocktv.top/connect?key={API_KEY}",
  39.             on_message=self.on_message,
  40.             on_open=lambda ws: self.subscribe_symbols(ws)
  41.         )
  42.         
  43.         # 启动WebSocket连接
  44.         wst = threading.Thread(target=ws.run_forever)
  45.         wst.start()
  46. # 辅助函数:获取股票名称
  47. def get_stock_name(symbol):
  48.     url = f"{BASE_URL}/stock/queryStocks"
  49.     params = {
  50.         "symbol": symbol,
  51.         "countryId": INDIA_COUNTRY_ID,
  52.         "key": API_KEY
  53.     }
  54.     response = requests.get(url, params=params)
  55.     return response.json()['data'][0]['name']
  56. # 启动实时数据服务
  57. india_data = IndiaMarketData()
  58. india_data.start()
复制代码
2. 实时数据存储方案
  1. from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. from datetime import datetime
  5. Base = declarative_base()
  6. class RealTimeData(Base):
  7.     __tablename__ = 'india_realtime_data'
  8.    
  9.     id = Column(Integer, primary_key=True)
  10.     symbol = Column(String(20))
  11.     exchange = Column(String(10))
  12.     last_price = Column(Float)
  13.     volume = Column(Integer)
  14.     timestamp = Column(DateTime)
  15.     created_at = Column(DateTime, default=datetime.utcnow)
  16. # 初始化数据库连接
  17. engine = create_engine('sqlite:///india_market.db')
  18. Base.metadata.create_all(engine)
  19. Session = sessionmaker(bind=engine)
  20. def save_realtime_data(data):
  21.     session = Session()
  22.     try:
  23.         record = RealTimeData(
  24.             symbol=data['symbol'],
  25.             exchange=data.get('exchange', 'NSE'),
  26.             last_price=data['last'],
  27.             volume=data['volume'],
  28.             timestamp=datetime.fromtimestamp(data['timestamp'])
  29.         )
  30.         session.add(record)
  31.         session.commit()
  32.     except Exception as e:
  33.         print(f"保存数据失败: {e}")
  34.         session.rollback()
  35.     finally:
  36.         session.close()
  37. # 在on_message回调中调用
  38. # save_realtime_data(data)
复制代码
五、印度IPO新股数据深度对接

1. 获取IPO日历与详情
  1. def get_india_ipo_list(status="upcoming"):
  2.     """
  3.     获取印度IPO列表
  4.     :param status: upcoming(即将上市)/recent(近期上市)
  5.     """
  6.     url = f"{BASE_URL}/stock/getIpo"
  7.     params = {
  8.         "countryId": INDIA_COUNTRY_ID,
  9.         "status": status,
  10.         "key": API_KEY
  11.     }
  12.     response = requests.get(url, params=params)
  13.     return response.json()
  14. # 获取即将上市的IPO
  15. upcoming_ipos = get_india_ipo_list("upcoming")
  16. print("即将上市的IPO:")
  17. for ipo in upcoming_ipos['data'][:5]:
  18.     print(f"{ipo['company']} ({ipo['symbol']}) - 发行价: ₹{ipo['ipoPrice']}")
  19. # 获取近期上市的IPO表现
  20. recent_ipos = get_india_ipo_list("recent")
  21. print("\n近期IPO表现:")
  22. for ipo in recent_ipos['data'][:5]:
  23.     change = (ipo['last'] - ipo['ipoPrice']) / ipo['ipoPrice'] * 100
  24.     print(f"{ipo['company']}: 发行价 ₹{ipo['ipoPrice']} → 当前 ₹{ipo['last']} ({change:.2f}%)")
复制代码
2. IPO数据分析与可视化
  1. import plotly.express as px
  2. def analyze_ipo_performance():
  3.     # 获取过去6个月的IPO数据
  4.     ipos = get_india_ipo_list("recent")['data']
  5.     df = pd.DataFrame(ipos)
  6.    
  7.     # 计算首日/首周涨跌幅
  8.     df['listing_gain'] = (df['listingPrice'] - df['ipoPrice']) / df['ipoPrice'] * 100
  9.     df['current_gain'] = (df['last'] - df['ipoPrice']) / df['ipoPrice'] * 100
  10.    
  11.     # 绘制散点图
  12.     fig = px.scatter(df, x='listing_gain', y='current_gain',
  13.                      hover_data=['company', 'symbol'],
  14.                      title="印度IPO表现分析",
  15.                      labels={'listing_gain':'首日涨幅(%)', 'current_gain':'当前涨幅(%)'})
  16.    
  17.     # 添加参考线
  18.     fig.add_hline(y=0, line_dash="dash")
  19.     fig.add_vline(x=0, line_dash="dash")
  20.    
  21.     fig.show()
  22.    
  23.     return df
  24. ipo_analysis = analyze_ipo_performance()
复制代码
六、生产环境最佳实践

1. 错误处理与重试机制
  1. from tenacity import retry, stop_after_attempt, wait_exponential
  2. import logging
  3. logging.basicConfig(level=logging.INFO)
  4. logger = logging.getLogger(__name__)
  5. @retry(stop=stop_after_attempt(3),
  6.        wait=wait_exponential(multiplier=1, min=4, max=10),
  7.        before_sleep=lambda retry_state: logger.warning(
  8.            f"重试 {retry_state.attempt_number} 次,原因: {retry_state.outcome.exception()}")
  9.        )
  10. def safe_api_call(url, params):
  11.     try:
  12.         response = requests.get(url, params=params, timeout=10)
  13.         response.raise_for_status()
  14.         return response.json()
  15.     except requests.exceptions.RequestException as e:
  16.         logger.error(f"API请求失败: {e}")
  17.         raise
复制代码
2. 性能优化方案
  1. import redis
  2. from functools import lru_cache
  3. # 初始化Redis连接
  4. r = redis.Redis(host='localhost', port=6379, db=0)
  5. @lru_cache(maxsize=100)
  6. def get_stock_info(symbol):
  7.     """缓存股票基本信息"""
  8.     cache_key = f"stock:{symbol}:info"
  9.     cached = r.get(cache_key)
  10.     if cached:
  11.         return json.loads(cached)
  12.    
  13.     url = f"{BASE_URL}/stock/queryStocks"
  14.     params = {
  15.         "symbol": symbol,
  16.         "countryId": INDIA_COUNTRY_ID,
  17.         "key": API_KEY
  18.     }
  19.     data = safe_api_call(url, params)
  20.     r.setex(cache_key, 3600, json.dumps(data))  # 缓存1小时
  21.     return data
  22. # 批量获取K线数据优化
  23. def batch_get_kline(symbols, interval):
  24.     """批量获取K线数据,减少API调用次数"""
  25.     results = {}
  26.     with ThreadPoolExecutor(max_workers=5) as executor:
  27.         future_to_symbol = {
  28.             executor.submit(get_india_kline, sym, "NSE", interval): sym
  29.             for sym in symbols
  30.         }
  31.         for future in as_completed(future_to_symbol):
  32.             symbol = future_to_symbol[future]
  33.             try:
  34.                 results[symbol] = future.result()
  35.             except Exception as e:
  36.                 logger.error(f"获取{symbol}数据失败: {e}")
  37.     return results
复制代码
七、总结与资源

关键要点回顾


  • K线数据:支持多周期获取,专业级可视化方案
  • 实时行情:WebSocket低延迟连接,支持NSE/BSE双交易所
  • IPO数据:完整的新股上市日历与表现追踪
扩展资源


  • 印度证券交易委员会(SEBI)官网
  • NSE官方数据文档
  • StockTV完整API文档
特别提示:印度市场有特殊的节假日安排和交易规则,建议在实现中考虑:

  • 处理IST时区转换(UTC+5:30)
  • 关注SEBI监管政策变化
  • 对IPO锁定期等特殊规则进行额外处理

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册