前言
想用自己的设备看电视?有多个设备要看电视?要用一根网线即上网又看电视?
本文提供 udpxy / igmpproxy / vlan单线复用 三种方案满足你的全部需求。
不使用运营商盒子
方案一 - udpxy 组播转单拨 (推荐)
前置条件: Openwrt 路由器一台
优点: 不限设备,随时随地观看直播
缺点: 部分IPTV服务缺失(点播,回放等)
Step1: 将 IPTV 接入主路由
- 如果你有光猫超级管理员权限,可以登陆光猫设置端口vlan绑定
-
登陆光猫,查看
互联网
与IPTV
业务的VLAN ID
从上图可以看出,互联网业务是772,IPTV 是20。 -
设置端口 VLAN 绑定
这里将772绑定为10,20绑定为11,具体绑定的ID任意
-
- 如果没有,则需要另取一根网线将光猫的iptv端口连接到路由器上
Step2: 配置主路由
警告:
请务必备份你的路由器设置!
此步骤操作不当可能导致与路由器失联,需重置路由器。
将所有WAN接口绑定的设备修改为br-lan.【Internet 业务绑定ID】
,这里均设置为br-lan.10
,注意此时不要应用设置,仅保存即可 。
设置LAN接口的设备为br-lan.1
(当然也可以设置其他的数字,不要一样就行),同样此时不要应用设置,仅保存即可 。
打开接口 - 设备 - br-lan - 配置,按照下图进行设置
将IPTV接口连接到盒子上用于一会抓包(这里是eth3
)
保存并应用全部配置,稍等片刻。此时 IPTV 盒子应该可以正常使用,且 LAN 正常访问互联网。
Step3: 抓包
安装好wireshark,电脑运行如下命令,开始抓包(地址和网口根据你的情况修改)
- Linux
ssh root@192.168.100.1 tcpdump -i eth3 -U -s0 -w - | wireshark -k -i -
- Windows
ssh root@192.168.100.1 'tcpdump -i eth3 -U -s0 -w -' | /path/to/Wireshark.exe -k -i -
此时,打开盒子电源,等待启动完成后打开直播随便切几个台。
Step4: 模拟盒子登陆认证
理论上可以直接设置成一个静态地址跳过认证,不过为了避免潜在的IP冲突,不稳定性,这里还是使用模拟盒子 IPoE 认证的方法。
停止抓包,过虑 dhcp 数据包,找到并单击 dhcp discover 请求。
记录盒子 MAC 地址
、Host Name
、Vendor class identifier
。
其中 Vendor class identifier
应该是二进制数据,需要右键 -> 复制 -> As a hex stream。
新建一个名为 IPTV
的接口,按照下图配置:
并在 接口 - 设备 - br-lan.11 配置 MAC 地址。
最后,为了正常发送 hex 格式的 Vendor class identifier,需要将 /lib/netifd/proto/dhcp.sh
的 72 行
从
${vendorid:+-V "$vendorid"} \
修改为
${vendorid:+-V "" "-x 0x3c:$vendorid"} \
保存并应用设置,此时应当认证成功并为之分配了一个IP。
Step5: 安装并配置 udpxy
打开 系统 - 软件包 - 下载并安装软件包,安装 luci-app-udpxy
刷新,打开 服务 - udpxy 按照下图配置:
Bind Interface 填 br-lan.【LAN 的 VLAN ID】
Source Interface 填 br-lan.【IPTV 的 VLAN ID】
Step6: 分析组播地址并测试
在 Wireshark 中,输入 http
过滤 http 数据包,Ctrl+F 搜索 channelorderbyset_data.jsp
右键该条目,追踪流 - HTTP Stream
找到诸如 igmp://225.1.2.47:10276
的数据,将其替换为 http://<你的路由器IP>:4022/rtp/225.1.2.47:10276
即对应频道的内网单播直播地址,填入播放器即可播放。
如果你无法通过该方法获得频道列表,可以观察接收到的大量 udp 组播数据包获得频道的组播地址(此处为 225.1.2.47
)。
使用运营商盒子
一般来说,IPTV要求预埋一条从光猫到电视的专用网线。但是由于装修时未考虑该需求,只留了一根网线用于放置在客厅的路由器,导致IPTV无法安装或不得不放弃客厅的无线覆盖。这种情况下可以考虑igmproxy
或单线复用
方案。
方案二 - igmpproxy
前置条件: Openwrt 路由器一台
优点: 无需购买其他专用设备
缺点: 观看直播时出现广播风暴,影响其他网络设备的性能
如果想让运营商赠送的盒子正常使用,需要满足以下条件:
- 正常访问位于内网的服务器
- 路由器能够处理组播igmp协议
Step1: 配置路由器并模拟认证
完成方案一 Step1
到 Step4
Step2: 连通IPTV内网
添加一个静态路由,设置 10.0.0.0/8 的内网段走 IPTV 接口。
其中网关地址和子网掩码可以在盒子设置或者抓的 DHCP Offer 包的 Relay agent IP address
与 Subnet Mask
字段得到。
打开接口 - IPTV - 编辑 - 防火墙设置 - 创建新的名为 IPTV 的防火墙区域。
打开网络 - 防火墙,如图打开 IPTV 域的 IP 动态伪装(即 NAT 功能)。
并编辑 lan 区域,在 允许转发到目标区域
中添加 iptv
区域
保存并应用,配置盒子使用不带认证的 DHCP 获取 IP地址,此时插入局域网任意 LAN 口,盒子应该能正常联网,但无法观看直播(因为现在还无法加入组播)。
Step3: 启用 igmpproxy
打开 系统 - 软件包 - 下载并安装软件包,安装 igmpproxy
编辑 /etc/config/igmpproxy
config igmpproxy
option quickleave 1
config phyint
option network br-lan.11
option zone iptv # the upstream firewall zone for forward rules
option direction upstream
list altnet 0.0.0.0/0 # a description of allowed source addresses for multicast packets
config phyint
option network br-lan.1
option zone lan #the downstream firewall zone for forward rules
option direction downstream
注意根据自己情况修改 br-lan.11
、br-lan.1
、option zone iptv
重启路由器,此时盒子应当能在任意 LAN 口正常使用。
方案三 - 单线复用
前置条件: Openwrt 路由器一台、网管交换机(支持vlan)一台
优点: 对局域网其他设备影响较小,稳定性高
缺点: 另需购买一台专用设备
Step1: 配置路由器
完成方案一 Step1
到 Step2
,并按照下图设置 VLAN 交换:
其中,eth3 将连接具有vlan功能的交换机或路由器。
Step2: 配置网管交换机
将网管交换机插入IPTV的端口设置为 vlan id 为 11 的 untagged 模式,
插入其他联网设备的端口设置为 vlan id 为 1 的 untagged 模式即可。
由于具体设备不同,详细方法自行查阅设备说明书。
进阶操作
由于不同地区系统不同,以下内容仅供参考
操作前请先根据方案一
和方案二
的Step2
完成主路由配置
通过脚本模拟运营商盒子登陆,数据请求,格式转换等,可以让第三方软件如 tivimate
实时获取到最新的节目单与频道列表数据,甚至可以支持回放功能。
此处给的脚本基于河南联通,不同地区与运营商需要根据自己抓的包进行一定的修改。
具体步骤
Step1: 获取必要数据
打开之前 Wireshark 抓取的数据包,如图搜索找到登陆请求 xxxxxx/auth.jsp
,右键 -> 追踪流 -> HTTP Stream
记录 Authenticator
参数,与 Host
后的地址,记为 API_EPG_BASE
。
搜索 /getencrypttoken.jsp
,同样的操作,记录 Host
后的地址,记为 API_EAS_BASE
。
Authenticator 参数实际上是下方内容用 $
拼接起来经 3DES 加密后的 HEX 数据:
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
8位随机数 | 请求 getencrypttoken.jsp 的得到的 Encrypt Token |
账号 | 序列号 | IP | MAC | ∅ | CTC |
加密的密码一般是你的机顶盒账户密码(六位)并用字符 0 填充到二十四位,如果你不知道密码,可以打开盒子系统设置-账户中查看,或者尝试000000
(笔者所在地区的账号密码是盒子第一次启动时系统自动下发的)。
Step2: 修改脚本代码
#!/usr/bin/env python3
import random
import re
import os
import json
import time
from datetime import datetime
import requests
from urllib.parse import urlsplit, parse_qs
from Crypto.Cipher import DES3
from Crypto.Util.Padding import unpad, pad
from xml.etree.ElementTree import Element, SubElement, tostring
KEY = '000000'.ljust(24, '0') # 修改六位数字密码
AUTHENTICATOR = '' # 填写你抓取到的 Authenticator
# 下面地址根据抓的包自行修改
API_EAS_IP = '10.222.33.44'
API_EAS_BASE = 'http://10.222.33.44:8080/iptvepg/'
API_EPG_BASE = 'http://10.233.44.55:8080/iptvepg/'
UDPXY_BASE = 'http://192.168.1.1:5678/rtp/'
SERVICE_BASE = 'http://192.168.1.1:1234'
COMMON_HEADERS = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; ChromiumBrowser) AppleWebKit/534.24 (KHTML, like Gecko) Safari/534.24 SkWebKit-HA-CU',
}
os.chdir(os.path.dirname(__file__))
def auth_in():
def adjust_key_parity(key_in):
def parity_byte(key_byte):
parity = 1
for i in range(1, 8):
parity ^= (key_byte >> i) & 1
return (key_byte & 0xFE) | parity
from Crypto.Util.py3compat import bchr
from Crypto.Util.py3compat import bord
key_out = b"".join([bchr(parity_byte(bord(x))) for x in key_in])
return key_out
# ignore error: Triple DES key degenerates to single DES
DES3.adjust_key_parity = adjust_key_parity
cryptor = DES3.new(KEY, DES3.MODE_ECB)
data = cryptor.decrypt(bytes.fromhex(AUTHENTICATOR))
data = unpad(data, DES3.block_size).decode()
data = data.split('$')
# get encrypt token
headers = COMMON_HEADERS.copy().update({
'Host': 'iptvz.shangdu.com:8080',
})
res = requests.get(API_EAS_BASE + 'platform/getencrypttoken.jsp', headers=headers, params={
'UserID': data[2],
'Action': 'Login',
'TerminalFlag': 1,
'TerminalOsType': 0,
'STBID': '',
'stbtype': ''
}).text
encrypt_token = re.search(r"GetAuthInfo\('(.*)'\)", res).group(1)
# replace 8-digit random number
data[0] = str(random.randint(0, 99999999)).zfill(8)
# replace encrypt token
data[1] = encrypt_token
# auth
session = requests.Session()
session.headers.update(COMMON_HEADERS)
res = session.post(API_EPG_BASE + 'platform/auth.jsp', params={
'easip': API_EAS_IP,
'ipVersion': 4,
'networkid': 1
}, data={
'UserID': data[2],
'Authenticator': cryptor.encrypt(pad('$'.join(data).encode(), DES3.block_size)).hex().upper(),
'StbIP': data[4]
})
# convert server time to local time
serverTime = datetime.strptime(res.headers['Date'], '%a, %d %b %Y %H:%M:%S %Z')
serverExpiredTime = re.search(r"\('TokenExpiredTime', *'([^']*)'", res.text).group(1)
serverExpiredTime = datetime.strptime(serverExpiredTime, '%Y.%m.%d %H:%M:%S')
expiredTime = datetime.now() + (serverExpiredTime - serverTime)
redirect_url = re.search(r"window\.location(?:\.href)? *= *'(.*)'", res.text).group(1)
session.get(redirect_url)
redirect_url = urlsplit(redirect_url)
params = {k: v[0] for k, v in parse_qs(redirect_url.query).items()}
res = session.post(API_EPG_BASE + 'function/funcportalauth.jsp', data={
'UserToken': params['UserToken'],
'UserID': params['UserID'],
'STBID': params['STBID'],
'stbinfo': '',
'prmid': '',
'easip': params['easip'],
'networkid': params['networkid'],
'stbtype': 'E900V21E',
'drmsupplier': ''
})
assert res.headers['X-Frame-UserToken'] == params['UserToken']
# 加载首页
res = session.get(API_EPG_BASE + 'frame1442/portal.jsp?tempno=-1')
assert res.headers['X-Frame-UserToken'] == params['UserToken']
return session, expiredTime
_session = None
_session_expire = datetime(1970, 1, 1)
try:
with open('iptv.json', 'r') as f:
cache = json.load(f)
_session = requests.Session()
_session.headers.update(COMMON_HEADERS)
_session.cookies.update(cache['cookies'])
_session_expire = datetime.fromisoformat(cache['expireTime'])
except FileNotFoundError:
pass
def cached_auth_in():
global _session, _session_expire
if _session is None or datetime.now() >= _session_expire:
print('[*]', 'Cache expired, re-authenticating...')
_session, _session_expire = auth_in()
with open('iptv.json', 'w') as f:
json.dump({
'cookies': _session.cookies.get_dict(),
'expireTime': _session_expire.isoformat()
}, f)
return _session
def request(method, url, retry=True, **kwargs):
global _session_expire
session = cached_auth_in()
res = session.request(method, url, **kwargs)
err = re.search(r"qrcodeerror\.jsp\?errorcode=(\d+)", res.text)
sessionExp = re.search(r"rebuildsessionresponse\.jsp", res.text)
if err or sessionExp:
if err:
print('[!]', 'An error occurred during request:', err.group(1))
if retry:
print('[!]', 'Refreshing session...')
_session_expire = datetime(1970, 1, 1)
cached_auth_in()
return request(method, url, retry=False, **kwargs)
else:
raise Exception('Request failed, error code: %s' % err.group(1))
return res
def channel_list():
res = request('post', API_EPG_BASE + 'function/frameset_builder.jsp', data={
"MAIN_WIN_SRC": "/iptvepg/frame1442/portal.jsp?tempno=-1",
"NEED_UPDATE_STB": "1",
"BUILD_ACTION": "FRAMESET_BUILDER",
"hdmistatus": "undefined"
})
# parse channel info
channels_info = re.findall(r"jsSetChannelInfo\(([^)]+)\);", res.text)
channels_info = [json.loads('[%s]' % channel_info.replace("'", '"')) for channel_info in channels_info]
attributes = ['userChannelID', 'timeShift', 'TSTVtime', 'isIgmp', 'channelId', 'channelName', 'columnId',
'channelType', 'pipEnable', 'lpvrEnable', 'channelLevel', 'isCanLock', 'isIPPV', 'mixno',
'cdnchannelCode', 'advertisecontent', 'definition', 'tvPauseEnable', 'ottcdnchannelcode',
'funcswitch', 'allownettype']
channels_info = {channel[4]: {k: channel[i] for i, k in enumerate(attributes)} for channel in channels_info}
# parse channel config
channels = re.findall(r"jsSetConfig\('Channel', *'([^']+)'\)", res.text)
channels = [json.loads('{%s}' % re.sub(r"(,|^) *([a-zA-Z0-9]+) *=", r'\1"\2":', channel)) for channel in channels]
# merge channel info to config
channels = [{**channel, **channels_info[channel['ChannelID']]} for channel in channels]
# 获取分类名称
res = request('get', API_EPG_BASE + 'frame1451/sdk_getcolumnlist.jsp?columncode=01').json()
assert res['returncode'] == '0'
columns = {column['columncode']: column['columnname'] for column in res['data']}
channels = [{**channel, 'columnname': columns[channel['columnId']]} for channel in channels]
return channels
def generate_epg(channels):
today = datetime.now()
today_plus_3 = today.replace(day=today.day + 3)
tv = Element('tv')
for channel in channels:
channel_el = SubElement(tv, 'channel', id=channel['UserChannelID'])
SubElement(channel_el, 'display-name', lang="zh").text = channel['ChannelName']
channel_id_map = {channel['ChannelID']: channel['UserChannelID'] for channel in channels}
def parse_time(time):
return datetime.strptime(time, '%Y.%m.%d %H:%M:%S').strftime('%Y%m%d%H%M%S +0800')
for i, channel in enumerate(channels):
if i % 10 == 0:
print('[*]', 'Fetching EPG for channel', i + 1, '/', len(channels))
# 获取节目预告
res = request('get', API_EPG_BASE + 'frame1451/sdk_getprevuellist.jsp?', params={
"channelcode": channel['ChannelID'],
"begintime": "{} 00:00:00".format(today.strftime('%Y.%m.%d')),
"endtime": "{} 23:59:59".format(today_plus_3.strftime('%Y.%m.%d')),
"utcbegintime": "",
"utcendtime": ""
}).json()
if res['returncode'] != '0':
print('[!]', 'Failed to fetch EPG for', channel['ChannelName'])
print(res)
continue
for program in res['data']:
programme = SubElement(tv, 'programme',
start=parse_time(program['begintime']),
stop=parse_time(program['endtime']),
channel=channel_id_map[program['channelcode']])
SubElement(programme, 'title', lang="zh").text = program['prevuename']
SubElement(programme, 'desc', lang="zh").text = program['description']
time.sleep(random.random())
return (b'<?xml version="1.0" encoding="UTF-8"?>\n<!DOCTYPE tv SYSTEM "xmltv.dtd">\n'
+ tostring(tv, encoding='utf-8'))
def generate_m3u(channels):
m3u = ['#EXTM3U url-tvg="{base}/epg.xml" x-tvg-url="{base}/epg.xml"'.format(base=SERVICE_BASE)]
for channel in channels:
m3u_item = ['#EXTINF:-1', 'tvg-id="{}"'.format(channel['UserChannelID']), 'tvg-name="{}"'.format(channel['ChannelName']), 'tvg-group="{}"'.format(channel['columnname'])]
if os.path.exists('web/icons/{}.png'.format(channel['ChannelID'])):
m3u_item.append('tvg-logo="{}/icons/{}.png"'.format(SERVICE_BASE, channel['ChannelID']))
if channel['TimeShift'] == '1':
m3u_item.append('catchup="append"')
m3u_item.append('catchup-source="{}"'.format(channel['TimeShiftURL']))
m3u.append(' '.join(m3u_item) + ',' + channel['ChannelName'])
m3u.append(re.sub(r"^igmp://", UDPXY_BASE, channel['ChannelURL']))
return '\n'.join(m3u)
if __name__ == '__main__':
channels = channel_list()
with open('web/epg.xml', 'wb') as f:
f.write(generate_epg(channels))
with open('web/iptv.m3u', 'w') as f:
f.write(generate_m3u(channels))
print('[*]', 'Done')
Step3: 安装 Python 环境
将脚本放置在路由器任意目录,这里以/root/iptv/iptv.py
为例,并在同目录下创建web
文件夹。
执行下面命令安装 python 及依赖:
opkg install python3 python3-pip python3-venv
cd /root/iptv
virtualenv venv
/root/iptv/venv/bin/pip install pycryptodome requests
手动执行 /root/iptv/venv/bin/python /root/iptv/iptv.py
测试生成节目单与播放列表。
在 /root/iptv/web/icons
放入台标,以 [ChannelID].png
格式命名。
在系统-计划任务追加下面内容:
0 1 */1 * * /root/iptv/venv/bin/python /root/iptv/iptv.py >> /var/log/iptv.log
用 Docker 创建一个 WEB 服务器:
docker run --restart unless-stopped --name nginx -v /root/iptv/web:/usr/share/nginx/html -v /var/log/nginx:/var/log/nginx -p 1234:80 -d nginx
Step4: 设置播放器
- xmltv 地址:
http://192.168.1.1:1234/epg.xml
- m3u 地址:
http://192.168.1.1:1234/iptv.m3u
最终效果如下图(如使用 jellyfin):