m3u parsing improvement

This commit is contained in:
DYefremov
2024-01-20 15:32:45 +03:00
parent a432da60e5
commit f3d133b7a3
3 changed files with 43 additions and 36 deletions

View File

@@ -2,7 +2,7 @@
#
# The MIT License (MIT)
#
# Copyright (c) 2018-2023 Dmitriy Yefremov
# Copyright (c) 2018-2024 Dmitriy Yefremov
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@@ -58,6 +58,9 @@ class StreamType(Enum):
def parse_m3u(path, s_type, detect_encoding=True, params=None):
""" Parses *m3u* file and returns tuple with EPG src URLs and services list. """
pattern = re.compile(r'(\S+)="(.*?)"')
with open(path, "rb") as file:
data = file.read()
encoding = "utf-8"
@@ -73,8 +76,10 @@ def parse_m3u(path, s_type, detect_encoding=True, params=None):
aggr = [None] * 10
s_aggr = aggr[: -3]
services = []
epg_src = None
group = None
groups = set()
services = []
marker_counter = 1
sid_counter = 1
name = None
@@ -85,47 +90,42 @@ def parse_m3u(path, s_type, detect_encoding=True, params=None):
m_name = BqServiceType.MARKER.name
for line in str(data, encoding=encoding, errors="ignore").splitlines():
if line.startswith("#EXTM3U"):
data = dict(pattern.findall(line))
epg_src = data.get("x-tvg-url", data.get("url-tvg", None))
epg_src = epg_src.split(",") if epg_src else None
if line.startswith("#EXTINF"):
line, sep, name = line.rpartition(",")
data = re.split('"', line)
size = len(data)
if size < 3:
continue
d = {data[i].lower().strip(" ="): data[i + 1] for i in range(0, len(data) - 1, 2)}
picon = d.get("tvg-logo", None)
data = dict(pattern.findall(line))
name = data.get("tvg-name", name)
picon = data.get("tvg-logo", None)
if s_type is SettingsType.ENIGMA_2:
grp_name = d.get("group-title", None)
if grp_name not in groups:
groups.add(grp_name)
fav_id = MARKER_FORMAT.format(marker_counter, grp_name, grp_name)
marker_counter += 1
mr = Service(None, None, None, grp_name, *aggr[0:3], m_name, *aggr, fav_id, None)
services.append(mr)
group = data.get("group-title", None)
elif line.startswith("#EXTGRP") and s_type is SettingsType.ENIGMA_2:
grp_name = line.strip("#EXTGRP:").strip()
if grp_name not in groups:
groups.add(grp_name)
fav_id = MARKER_FORMAT.format(marker_counter, grp_name, grp_name)
marker_counter += 1
mr = Service(None, None, None, grp_name, *aggr[0:3], m_name, *aggr, fav_id, None)
services.append(mr)
elif not line.startswith("#"):
group = line.strip("#EXTGRP:").strip()
elif not line.startswith("#") and "://" in line:
url = line.strip()
params[0] = sid_counter
sid_counter += 1
fav_id = get_fav_id(url, name, s_type, params)
if s_type is SettingsType.ENIGMA_2:
p_id = get_picon_id(params)
if group not in groups:
# Some playlists have "random" of group names.
# We will take only the first one we found on the list!
groups.add(group)
m_id = MARKER_FORMAT.format(marker_counter, group, group)
marker_counter += 1
services.append(Service(None, None, None, group, *aggr[0:3], m_name, *aggr, m_id, None))
if all((name, url, fav_id)):
srv = Service(None, None, IPTV_ICON, name, *aggr[0:3], st, picon, p_id, *s_aggr, url, fav_id, None)
services.append(srv)
services.append(Service(None, None, IPTV_ICON, name, *aggr[0:2], group,
st, picon, p_id, *s_aggr, url, fav_id, None))
else:
log(f"*.m3u* parse error ['{path}']: name[{name}], url[{url}], fav id[{fav_id}]")
return services
return epg_src, services
def export_to_m3u(path, bouquet, s_type, url=None):