找回密码
 立即注册
首页 业界区 安全 对接印尼金融市场数据K线、实时行情与IPO新股 ...

对接印尼金融市场数据K线、实时行情与IPO新股

辗振 2025-6-1 18:36:54
一、印尼金融市场数据特点与价值

印度尼西亚作为东南亚最大经济体,其金融市场具有以下显著特点:

  • 快速增长:雅加达综合指数(JKSE)过去5年年化增长率达12%
  • 特色板块:棕榈油、煤炭、镍矿等资源类股票交易活跃
  • IPO活跃:2023年印尼IPO数量位居东南亚首位
  • 交易时段:上午9:30-12:00,下午13:30-16:00(UTC+7)
二、环境准备与基础配置

1. API密钥获取
  1. # 配置示例
  2. API_KEY = "your_indonesia_api_key"  # 通过官网申请或联系客服获取
  3. BASE_URL = "https://api.stocktv.top"
  4. INDONESIA_ID = 48  # 印尼国家代码
复制代码
2. 安装必要库
  1. pip install requests websocket-client pandas plotly python-dotenv
复制代码
3. 安全配置(推荐)
  1. from dotenv import load_dotenv
  2. import os
  3. load_dotenv()
  4. API_KEY = os.getenv('STOCKTV_API_KEY')  # 从环境变量读取
复制代码
三、K线数据专业对接方案

1. 多周期K线获取
  1. def get_idn_kline(stock_code, interval="1d", exchange="IDX", limit=100):
  2.     """
  3.     获取印尼股票K线数据
  4.     :param stock_code: 股票代码(如BBCA)
  5.     :param interval: 时间间隔(1m/5m/15m/1h/1d)
  6.     :param exchange: 交易所(IDX)
  7.     :param limit: 数据条数
  8.     """
  9.     url = f"{BASE_URL}/stock/kline"
  10.     params = {
  11.         "symbol": stock_code,
  12.         "exchange": exchange,
  13.         "interval": interval,
  14.         "countryId": INDONESIA_ID,
  15.         "limit": limit,
  16.         "key": API_KEY
  17.     }
  18.     response = requests.get(url, params=params)
  19.     data = response.json()
  20.    
  21.     # 转换为DataFrame并处理时区
  22.     df = pd.DataFrame(data['data'])
  23.     df['time'] = pd.to_datetime(df['time'], unit='ms')
  24.     df['time'] = df['time'].dt.tz_localize('UTC').dt.tz_convert('Asia/Jakarta')
  25.     return df
  26. # 获取BBCA银行日K数据
  27. bbca_kline = get_idn_kline("BBCA", interval="1d")
复制代码
2. 专业K线可视化(使用Plotly)
  1. import plotly.graph_objects as go
  2. def plot_idn_candlestick(df, title):
  3.     fig = go.Figure(data=[go.Candlestick(
  4.         x=df['time'],
  5.         open=df['open'],
  6.         high=df['high'],
  7.         low=df['low'],
  8.         close=df['close'],
  9.         increasing_line_color='green',
  10.         decreasing_line_color='red',
  11.         name='K线'
  12.     )])
  13.    
  14.     # 添加MA5均线
  15.     df['MA5'] = df['close'].rolling(5).mean()
  16.     fig.add_trace(go.Scatter(
  17.         x=df['time'],
  18.         y=df['MA5'],
  19.         name='MA5',
  20.         line=dict(color='orange', width=1)
  21.     ))
  22.    
  23.     fig.update_layout(
  24.         title=f'{title} - 印尼市场',
  25.         xaxis_title='雅加达时间(WIB)',
  26.         yaxis_title='价格(IDR)',
  27.         xaxis_rangeslider_visible=False,
  28.         template="plotly_white"
  29.     )
  30.    
  31.     fig.show()
  32. plot_idn_candlestick(bbca_kline, "BBCA银行日K线")
复制代码
四、实时行情数据对接

1. WebSocket实时数据订阅
  1. import websocket
  2. import json
  3. import threading
  4. class IDNMarketRealtime:
  5.     def __init__(self):
  6.         self.symbol_names = {
  7.             "BBCA": "Bank Central Asia",
  8.             "TLKM": "Telkom Indonesia",
  9.             "UNVR": "Unilever Indonesia"
  10.         }
  11.    
  12.     def on_message(self, ws, message):
  13.         data = json.loads(message)
  14.         
  15.         # 处理股票实时数据
  16.         if data.get('type') == 'stock':
  17.             symbol = data['symbol']
  18.             name = self.symbol_names.get(symbol, symbol)
  19.             print(f"[{data['time']}] {name}: {data['last']} "
  20.                   f"({data.get('chgPct', 0):.2f}%) 成交量: {data['volume']}")
  21.         
  22.         # 处理指数数据
  23.         elif data.get('type') == 'index':
  24.             print(f"指数更新 {data['name']}: {data['last']} "
  25.                   f"({data.get('chgPct', 0):.2f}%)")
  26.    
  27.     def start(self):
  28.         ws = websocket.WebSocketApp(
  29.             f"wss://ws-api.stocktv.top/connect?key={API_KEY}",
  30.             on_message=self.on_message,
  31.             on_open=self.on_open
  32.         )
  33.         
  34.         # 启动独立线程运行WebSocket
  35.         self.ws_thread = threading.Thread(target=ws.run_forever)
  36.         self.ws_thread.start()
  37.    
  38.     def on_open(self, ws):
  39.         # 订阅印尼龙头股和IDX指数
  40.         subscribe_msg = {
  41.             "action": "subscribe",
  42.             "countryId": INDONESIA_ID,
  43.             "symbols": list(self.symbol_names.keys()),
  44.             "indices": ["JKSE"]  # 雅加达综合指数
  45.         }
  46.         ws.send(json.dumps(subscribe_msg))
  47. # 启动实时行情服务
  48. realtime = IDNMarketRealtime()
  49. realtime.start()
复制代码
2. 实时数据存储与分析
  1. from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. from datetime import datetime
  5. Base = declarative_base()
  6. class RealtimeIDNStock(Base):
  7.     __tablename__ = 'realtime_idn_stocks'
  8.    
  9.     id = Column(Integer, primary_key=True)
  10.     symbol = Column(String(10))
  11.     name = Column(String(50))
  12.     price = Column(Float)
  13.     volume = Column(Integer)
  14.     timestamp = Column(DateTime)
  15.     created_at = Column(DateTime, default=datetime.utcnow)
  16. # 初始化数据库
  17. engine = create_engine('sqlite:///indonesia_market.db')
  18. Base.metadata.create_all(engine)
  19. Session = sessionmaker(bind=engine)
  20. def save_realtime_data(data):
  21.     session = Session()
  22.     try:
  23.         record = RealtimeIDNStock(
  24.             symbol=data['symbol'],
  25.             name=realtime.symbol_names.get(data['symbol']),
  26.             price=data['last'],
  27.             volume=data.get('volume', 0),
  28.             timestamp=datetime.fromtimestamp(data['timestamp'])
  29.         )
  30.         session.add(record)
  31.         session.commit()
  32.     except Exception as e:
  33.         print(f"数据存储失败: {e}")
  34.         session.rollback()
  35.     finally:
  36.         session.close()
  37. # 在on_message回调中添加:
  38. # save_realtime_data(data)
复制代码
五、印尼IPO新股数据深度对接

1. 获取IPO日历与详情
  1. def get_idn_ipo_list(status="upcoming"):
  2.     """
  3.     获取印尼IPO列表
  4.     :param status: upcoming(即将上市)/recent(近期上市)
  5.     """
  6.     url = f"{BASE_URL}/stock/getIpo"
  7.     params = {
  8.         "countryId": INDONESIA_ID,
  9.         "status": status,
  10.         "key": API_KEY
  11.     }
  12.     response = requests.get(url, params=params)
  13.     return response.json()
  14. # 获取即将上市的IPO
  15. upcoming_ipos = get_idn_ipo_list()
  16. print("即将上市的印尼IPO:")
  17. for ipo in upcoming_ipos['data'][:3]:
  18.     print(f"- {ipo['company']} ({ipo['symbol']})")
  19.     print(f"  行业: {ipo.get('industry', 'N/A')}")
  20.     print(f"  发行价: {ipo['ipoPrice']} IDR")
  21.     print(f"  上市日期: {ipo['date']}")
  22. # 获取近期IPO表现
  23. recent_ipos = get_idn_ipo_list("recent")
  24. print("\n近期IPO表现:")
  25. for ipo in recent_ipos['data'][:3]:
  26.     change_pct = (ipo['last'] - ipo['ipoPrice']) / ipo['ipoPrice'] * 100
  27.     print(f"- {ipo['company']}: {ipo['ipoPrice']} → {ipo['last']} "
  28.           f"({change_pct:.2f}%)")
复制代码
2. IPO数据分析可视化
  1. import plotly.express as px
  2. def analyze_idn_ipos():
  3.     ipos = get_idn_ipo_list("recent")['data']
  4.     df = pd.DataFrame(ipos)
  5.    
  6.     # 计算收益率
  7.     df['return_pct'] = (df['last'] - df['ipoPrice']) / df['ipoPrice'] * 100
  8.    
  9.     # 按行业分析
  10.     fig = px.box(df, x='industry', y='return_pct',
  11.                  title="印尼IPO分行业表现",
  12.                  labels={'return_pct':'收益率(%)', 'industry':'行业'})
  13.     fig.show()
  14.    
  15.     # 发行规模与收益率关系
  16.     fig = px.scatter(df, x='ipoValue', y='return_pct',
  17.                      hover_data=['company', 'symbol'],
  18.                      title="发行规模与收益率",
  19.                      labels={'ipoValue':'发行规模(十亿IDR)', 'return_pct':'收益率(%)'})
  20.     fig.show()
  21.    
  22.     return df
  23. ipo_analysis = analyze_idn_ipos()
复制代码
六、生产环境最佳实践

1. 错误处理与重试机制
  1. from tenacity import retry, stop_after_attempt, wait_exponential
  2. import logging
  3. logging.basicConfig(level=logging.INFO)
  4. logger = logging.getLogger(__name__)
  5. @retry(stop=stop_after_attempt(3),
  6.        wait=wait_exponential(multiplier=1, min=4, max=10),
  7.        before_sleep=lambda retry_state: logger.warning(
  8.            f"重试 {retry_state.attempt_number} 次,原因: {retry_state.outcome.exception()}")
  9.        )
  10. def safe_api_call(url, params):
  11.     try:
  12.         response = requests.get(url, params=params, timeout=10)
  13.         response.raise_for_status()
  14.         return response.json()
  15.     except requests.exceptions.RequestException as e:
  16.         logger.error(f"API请求失败: {e}")
  17.         raise
复制代码
2. 性能优化方案
  1. from functools import lru_cache
  2. import redis
  3. # 初始化Redis
  4. r = redis.Redis(host='localhost', port=6379, db=0)
  5. @lru_cache(maxsize=100)
  6. def get_stock_info_cached(symbol):
  7.     """带缓存的股票信息查询"""
  8.     cache_key = f"idn:stock:{symbol}:info"
  9.     cached = r.get(cache_key)
  10.     if cached:
  11.         return json.loads(cached)
  12.    
  13.     url = f"{BASE_URL}/stock/queryStocks"
  14.     params = {
  15.         "symbol": symbol,
  16.         "countryId": INDONESIA_ID,
  17.         "key": API_KEY
  18.     }
  19.     data = safe_api_call(url, params)
  20.     r.setex(cache_key, 3600, json.dumps(data))  # 缓存1小时
  21.     return data
  22. # 批量获取K线数据优化
  23. from concurrent.futures import ThreadPoolExecutor, as_completed
  24. def batch_get_kline(symbols, interval="1d"):
  25.     """并发获取多只股票K线数据"""
  26.     results = {}
  27.     with ThreadPoolExecutor(max_workers=5) as executor:
  28.         future_to_symbol = {
  29.             executor.submit(get_idn_kline, sym, interval): sym
  30.             for sym in symbols
  31.         }
  32.         for future in as_completed(future_to_symbol):
  33.             symbol = future_to_symbol[future]
  34.             try:
  35.                 results[symbol] = future.result()
  36.             except Exception as e:
  37.                 logger.error(f"获取{symbol}数据失败: {e}")
  38.     return results
  39. # 示例:批量获取印尼银行股数据
  40. bank_stocks = ["BBCA", "BBRI", "BMRI"]
  41. bank_data = batch_get_kline(bank_stocks)
复制代码
七、总结与资源

核心要点回顾


  • K线数据:支持从1分钟到日线的多周期获取,专业可视化方案
  • 实时行情:WebSocket低延迟连接,支持个股和指数
  • IPO数据:完整的上市日历与表现追踪,支持行业分析
扩展资源


  • 印尼证券交易所(IDX)官网
  • 印尼金融市场日历
  • StockTV完整API文档
特别提示:印尼市场有特殊的交易规则需要注意:

  • 价格波动限制(Auto Rejection):多数股票为±20%
  • 结算周期:T+2
  • 股息税:10%
  • 关注伊斯兰教法合规股票清单(特别适合伊斯兰金融投资者)

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册