一、印尼金融市场数据特点与价值
印度尼西亚作为东南亚最大经济体,其金融市场具有以下显著特点:
- 快速增长:雅加达综合指数(JKSE)过去5年年化增长率达12%
- 特色板块:棕榈油、煤炭、镍矿等资源类股票交易活跃
- IPO活跃:2023年印尼IPO数量位居东南亚首位
- 交易时段:上午9:30-12:00,下午13:30-16:00(UTC+7)
二、环境准备与基础配置
1. API密钥获取
- # 配置示例
- API_KEY = "your_indonesia_api_key" # 通过官网申请或联系客服获取
- BASE_URL = "https://api.stocktv.top"
- INDONESIA_ID = 48 # 印尼国家代码
复制代码 2. 安装必要库
- pip install requests websocket-client pandas plotly python-dotenv
复制代码 3. 安全配置(推荐)
- from dotenv import load_dotenv
- import os
- load_dotenv()
- API_KEY = os.getenv('STOCKTV_API_KEY') # 从环境变量读取
复制代码 三、K线数据专业对接方案
1. 多周期K线获取
- def get_idn_kline(stock_code, interval="1d", exchange="IDX", limit=100):
- """
- 获取印尼股票K线数据
- :param stock_code: 股票代码(如BBCA)
- :param interval: 时间间隔(1m/5m/15m/1h/1d)
- :param exchange: 交易所(IDX)
- :param limit: 数据条数
- """
- url = f"{BASE_URL}/stock/kline"
- params = {
- "symbol": stock_code,
- "exchange": exchange,
- "interval": interval,
- "countryId": INDONESIA_ID,
- "limit": limit,
- "key": API_KEY
- }
- response = requests.get(url, params=params)
- data = response.json()
-
- # 转换为DataFrame并处理时区
- df = pd.DataFrame(data['data'])
- df['time'] = pd.to_datetime(df['time'], unit='ms')
- df['time'] = df['time'].dt.tz_localize('UTC').dt.tz_convert('Asia/Jakarta')
- return df
- # 获取BBCA银行日K数据
- bbca_kline = get_idn_kline("BBCA", interval="1d")
复制代码 2. 专业K线可视化(使用Plotly)
- import plotly.graph_objects as go
- def plot_idn_candlestick(df, title):
- fig = go.Figure(data=[go.Candlestick(
- x=df['time'],
- open=df['open'],
- high=df['high'],
- low=df['low'],
- close=df['close'],
- increasing_line_color='green',
- decreasing_line_color='red',
- name='K线'
- )])
-
- # 添加MA5均线
- df['MA5'] = df['close'].rolling(5).mean()
- fig.add_trace(go.Scatter(
- x=df['time'],
- y=df['MA5'],
- name='MA5',
- line=dict(color='orange', width=1)
- ))
-
- fig.update_layout(
- title=f'{title} - 印尼市场',
- xaxis_title='雅加达时间(WIB)',
- yaxis_title='价格(IDR)',
- xaxis_rangeslider_visible=False,
- template="plotly_white"
- )
-
- fig.show()
- plot_idn_candlestick(bbca_kline, "BBCA银行日K线")
复制代码 四、实时行情数据对接
1. WebSocket实时数据订阅
- import websocket
- import json
- import threading
- class IDNMarketRealtime:
- def __init__(self):
- self.symbol_names = {
- "BBCA": "Bank Central Asia",
- "TLKM": "Telkom Indonesia",
- "UNVR": "Unilever Indonesia"
- }
-
- def on_message(self, ws, message):
- data = json.loads(message)
-
- # 处理股票实时数据
- if data.get('type') == 'stock':
- symbol = data['symbol']
- name = self.symbol_names.get(symbol, symbol)
- print(f"[{data['time']}] {name}: {data['last']} "
- f"({data.get('chgPct', 0):.2f}%) 成交量: {data['volume']}")
-
- # 处理指数数据
- elif data.get('type') == 'index':
- print(f"指数更新 {data['name']}: {data['last']} "
- f"({data.get('chgPct', 0):.2f}%)")
-
- def start(self):
- ws = websocket.WebSocketApp(
- f"wss://ws-api.stocktv.top/connect?key={API_KEY}",
- on_message=self.on_message,
- on_open=self.on_open
- )
-
- # 启动独立线程运行WebSocket
- self.ws_thread = threading.Thread(target=ws.run_forever)
- self.ws_thread.start()
-
- def on_open(self, ws):
- # 订阅印尼龙头股和IDX指数
- subscribe_msg = {
- "action": "subscribe",
- "countryId": INDONESIA_ID,
- "symbols": list(self.symbol_names.keys()),
- "indices": ["JKSE"] # 雅加达综合指数
- }
- ws.send(json.dumps(subscribe_msg))
- # 启动实时行情服务
- realtime = IDNMarketRealtime()
- realtime.start()
复制代码 2. 实时数据存储与分析
- from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- from datetime import datetime
- Base = declarative_base()
- class RealtimeIDNStock(Base):
- __tablename__ = 'realtime_idn_stocks'
-
- id = Column(Integer, primary_key=True)
- symbol = Column(String(10))
- name = Column(String(50))
- price = Column(Float)
- volume = Column(Integer)
- timestamp = Column(DateTime)
- created_at = Column(DateTime, default=datetime.utcnow)
- # 初始化数据库
- engine = create_engine('sqlite:///indonesia_market.db')
- Base.metadata.create_all(engine)
- Session = sessionmaker(bind=engine)
- def save_realtime_data(data):
- session = Session()
- try:
- record = RealtimeIDNStock(
- symbol=data['symbol'],
- name=realtime.symbol_names.get(data['symbol']),
- price=data['last'],
- volume=data.get('volume', 0),
- timestamp=datetime.fromtimestamp(data['timestamp'])
- )
- session.add(record)
- session.commit()
- except Exception as e:
- print(f"数据存储失败: {e}")
- session.rollback()
- finally:
- session.close()
- # 在on_message回调中添加:
- # save_realtime_data(data)
复制代码 五、印尼IPO新股数据深度对接
1. 获取IPO日历与详情
- def get_idn_ipo_list(status="upcoming"):
- """
- 获取印尼IPO列表
- :param status: upcoming(即将上市)/recent(近期上市)
- """
- url = f"{BASE_URL}/stock/getIpo"
- params = {
- "countryId": INDONESIA_ID,
- "status": status,
- "key": API_KEY
- }
- response = requests.get(url, params=params)
- return response.json()
- # 获取即将上市的IPO
- upcoming_ipos = get_idn_ipo_list()
- print("即将上市的印尼IPO:")
- for ipo in upcoming_ipos['data'][:3]:
- print(f"- {ipo['company']} ({ipo['symbol']})")
- print(f" 行业: {ipo.get('industry', 'N/A')}")
- print(f" 发行价: {ipo['ipoPrice']} IDR")
- print(f" 上市日期: {ipo['date']}")
- # 获取近期IPO表现
- recent_ipos = get_idn_ipo_list("recent")
- print("\n近期IPO表现:")
- for ipo in recent_ipos['data'][:3]:
- change_pct = (ipo['last'] - ipo['ipoPrice']) / ipo['ipoPrice'] * 100
- print(f"- {ipo['company']}: {ipo['ipoPrice']} → {ipo['last']} "
- f"({change_pct:.2f}%)")
复制代码 2. IPO数据分析可视化
- import plotly.express as px
- def analyze_idn_ipos():
- ipos = get_idn_ipo_list("recent")['data']
- df = pd.DataFrame(ipos)
-
- # 计算收益率
- df['return_pct'] = (df['last'] - df['ipoPrice']) / df['ipoPrice'] * 100
-
- # 按行业分析
- fig = px.box(df, x='industry', y='return_pct',
- title="印尼IPO分行业表现",
- labels={'return_pct':'收益率(%)', 'industry':'行业'})
- fig.show()
-
- # 发行规模与收益率关系
- fig = px.scatter(df, x='ipoValue', y='return_pct',
- hover_data=['company', 'symbol'],
- title="发行规模与收益率",
- labels={'ipoValue':'发行规模(十亿IDR)', 'return_pct':'收益率(%)'})
- fig.show()
-
- return df
- ipo_analysis = analyze_idn_ipos()
复制代码 六、生产环境最佳实践
1. 错误处理与重试机制
- from tenacity import retry, stop_after_attempt, wait_exponential
- import logging
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- @retry(stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- before_sleep=lambda retry_state: logger.warning(
- f"重试 {retry_state.attempt_number} 次,原因: {retry_state.outcome.exception()}")
- )
- def safe_api_call(url, params):
- try:
- response = requests.get(url, params=params, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"API请求失败: {e}")
- raise
复制代码 2. 性能优化方案
- from functools import lru_cache
- import redis
- # 初始化Redis
- r = redis.Redis(host='localhost', port=6379, db=0)
- @lru_cache(maxsize=100)
- def get_stock_info_cached(symbol):
- """带缓存的股票信息查询"""
- cache_key = f"idn:stock:{symbol}:info"
- cached = r.get(cache_key)
- if cached:
- return json.loads(cached)
-
- url = f"{BASE_URL}/stock/queryStocks"
- params = {
- "symbol": symbol,
- "countryId": INDONESIA_ID,
- "key": API_KEY
- }
- data = safe_api_call(url, params)
- r.setex(cache_key, 3600, json.dumps(data)) # 缓存1小时
- return data
- # 批量获取K线数据优化
- from concurrent.futures import ThreadPoolExecutor, as_completed
- def batch_get_kline(symbols, interval="1d"):
- """并发获取多只股票K线数据"""
- results = {}
- with ThreadPoolExecutor(max_workers=5) as executor:
- future_to_symbol = {
- executor.submit(get_idn_kline, sym, interval): sym
- for sym in symbols
- }
- for future in as_completed(future_to_symbol):
- symbol = future_to_symbol[future]
- try:
- results[symbol] = future.result()
- except Exception as e:
- logger.error(f"获取{symbol}数据失败: {e}")
- return results
- # 示例:批量获取印尼银行股数据
- bank_stocks = ["BBCA", "BBRI", "BMRI"]
- bank_data = batch_get_kline(bank_stocks)
复制代码 七、总结与资源
核心要点回顾
- K线数据:支持从1分钟到日线的多周期获取,专业可视化方案
- 实时行情:WebSocket低延迟连接,支持个股和指数
- IPO数据:完整的上市日历与表现追踪,支持行业分析
扩展资源
- 印尼证券交易所(IDX)官网
- 印尼金融市场日历
- StockTV完整API文档
特别提示:印尼市场有特殊的交易规则需要注意:
- 价格波动限制(Auto Rejection):多数股票为±20%
- 结算周期:T+2
- 股息税:10%
- 关注伊斯兰教法合规股票清单(特别适合伊斯兰金融投资者)
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |