diff --git a/runtime/ops/mapper/__init__.py b/runtime/ops/mapper/__init__.py index 4b970199..ed0a0fcb 100644 --- a/runtime/ops/mapper/__init__.py +++ b/runtime/ops/mapper/__init__.py @@ -47,6 +47,21 @@ def _import_operators(): from . import remove_duplicate_sentences from . import knowledge_relation_slice from . import pii_ner_detection - + # ===== Video operators (PR1-PR5) ===== + from . import _video_common + from . import video_format_convert + from . import video_sensitive_detect + from . import video_sensitive_crop + from . import video_mot_track + from . import video_subject_crop + from . import video_classify_qwenvl + from . import video_summary_qwenvl + from . import video_event_tag_qwenvl + from . import video_keyframe_extract + from . import video_deborder_crop + from . import video_audio_extract + from . import video_speech_asr + from . import video_subtitle_ocr + from . import video_text_ocr _import_operators() diff --git a/runtime/ops/mapper/_video_common/__init__.py b/runtime/ops/mapper/_video_common/__init__.py new file mode 100644 index 00000000..7c68785e --- /dev/null +++ b/runtime/ops/mapper/_video_common/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/runtime/ops/mapper/_video_common/ffmpeg.py b/runtime/ops/mapper/_video_common/ffmpeg.py new file mode 100644 index 00000000..c0340c04 --- /dev/null +++ b/runtime/ops/mapper/_video_common/ffmpeg.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +import os +import subprocess + +def run_cmd(cmd, logger=None): + if logger: + logger.info("FFmpeg cmd: " + " ".join(cmd)) + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p.returncode != 0: + msg = f"FFmpeg failed (code={p.returncode}).\nSTDOUT:\n{p.stdout}\nSTDERR:\n{p.stderr}" + raise RuntimeError(msg) + return p.stdout, p.stderr + +def convert_to_mp4_h264( + in_path: str, + out_path: str, + crf: int = 23, + preset: str = "veryfast", + audio: bool = True, + fps: int = None, + scale: str = None, # e.g. "1280:720" or None + logger=None, +): + """ + 最通用的“交付格式”:mp4(H.264) + yuv420p + - crf 越小质量越高,体积越大(18~28常用) + - preset 越慢压缩越好但越耗时(veryfast/fast/medium) + """ + os.makedirs(os.path.dirname(out_path), exist_ok=True) + + cmd = ["ffmpeg", "-y", "-i", in_path] + + # 视频参数 + cmd += ["-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", preset, "-crf", str(crf)] + + # 可选 fps / scale + if fps is not None: + cmd += ["-r", str(int(fps))] + if scale is not None: + cmd += ["-vf", f"scale={scale}"] + + # 音频 + if audio: + cmd += ["-c:a", "aac", "-b:a", "128k"] + else: + cmd += ["-an"] + + cmd += [out_path] + return run_cmd(cmd, logger=logger) + +def transcode_any( + in_path: str, + out_path: str, + vcodec: str = "libx264", + acodec: str = "aac", + pix_fmt: str = "yuv420p", + crf: int = 23, + preset: str = "veryfast", + vbitrate: str = None, # e.g. "2M" + abitrate: str = "128k", + fps: int = None, + scale: str = None, # e.g. "1280:720" + extra_args: list = None, + logger=None, +): + """ + 通用转码:支持任意容器/编码器组合 + - vcodec/acodec 支持 'copy'(封装重打包或直接流拷贝) + - out_path 后缀决定容器格式:.mp4/.mkv/.mov/.avi/.wmv... + """ + os.makedirs(os.path.dirname(out_path), exist_ok=True) + cmd = ["ffmpeg", "-y", "-i", in_path] + + # video + cmd += ["-c:v", vcodec] + if vcodec != "copy": + cmd += ["-pix_fmt", pix_fmt] + if crf is not None: + cmd += ["-crf", str(crf)] + if preset: + cmd += ["-preset", preset] + if vbitrate: + cmd += ["-b:v", str(vbitrate)] + + # fps/scale + if fps is not None: + cmd += ["-r", str(int(fps))] + if scale is not None: + cmd += ["-vf", f"scale={scale}"] + + # audio + cmd += ["-c:a", acodec] + if acodec != "copy": + if abitrate: + cmd += ["-b:a", str(abitrate)] + + if extra_args: + cmd += list(extra_args) + + cmd += [out_path] + return run_cmd(cmd, logger=logger) + + + +def cut_segment(in_path: str, out_path: str, start: float, end: float, logger=None): + os.makedirs(os.path.dirname(out_path), exist_ok=True) + cmd = ["ffmpeg", "-y", "-ss", str(start), "-to", str(end), "-i", in_path, "-c", "copy", out_path] + return run_cmd(cmd, logger=logger) + +def concat_segments(segment_paths, out_path: str, logger=None): + os.makedirs(os.path.dirname(out_path), exist_ok=True) + list_file = out_path + ".txt" + with open(list_file, "w", encoding="utf-8") as f: + for p in segment_paths: + f.write(f"file '{os.path.abspath(p)}'\n") + cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", out_path] + return run_cmd(cmd, logger=logger) \ No newline at end of file diff --git a/runtime/ops/mapper/_video_common/io_video.py b/runtime/ops/mapper/_video_common/io_video.py new file mode 100644 index 00000000..787a9b6c --- /dev/null +++ b/runtime/ops/mapper/_video_common/io_video.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +import cv2 + +def get_video_info(video_path: str): + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open video: {video_path}") + fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + return fps, width, height, frames \ No newline at end of file diff --git a/runtime/ops/mapper/_video_common/log.py b/runtime/ops/mapper/_video_common/log.py new file mode 100644 index 00000000..a47e9d32 --- /dev/null +++ b/runtime/ops/mapper/_video_common/log.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +import logging +import os + +def get_logger(name: str, log_dir: str = None): + logger = logging.getLogger(name) + if logger.handlers: + return logger + + logger.setLevel(logging.INFO) + fmt = logging.Formatter("[%(asctime)s] [%(levelname)s] %(message)s") + + sh = logging.StreamHandler() + sh.setFormatter(fmt) + logger.addHandler(sh) + + if log_dir: + os.makedirs(log_dir, exist_ok=True) + fh = logging.FileHandler(os.path.join(log_dir, "run.log"), encoding="utf-8") + fh.setFormatter(fmt) + logger.addHandler(fh) + + return logger \ No newline at end of file diff --git a/runtime/ops/mapper/_video_common/model_paths.py b/runtime/ops/mapper/_video_common/model_paths.py new file mode 100644 index 00000000..1a9bffe2 --- /dev/null +++ b/runtime/ops/mapper/_video_common/model_paths.py @@ -0,0 +1,28 @@ +import os + +def get_model_root(params=None) -> str: + """ + 模型根目录优先级: + 1) params['model_root'] + 2) 环境变量 DATAMATE_MODEL_ROOT + 3) 默认 /mnt/models + """ + params = params or {} + return params.get("model_root") or os.environ.get("DATAMATE_MODEL_ROOT") or "/mnt/models" + + +def resolve_model_path(params, param_key: str, default_rel: str) -> str: + """ + 解析模型路径: + - 如果 params[param_key] 是绝对路径:直接用 + - 如果是相对路径:拼到 model_root + - 如果没传:用 model_root + default_rel + """ + params = params or {} + root = get_model_root(params) + + v = params.get(param_key) + if v: + return v if os.path.isabs(v) else os.path.join(root, v) + + return os.path.join(root, default_rel) \ No newline at end of file diff --git a/runtime/ops/mapper/_video_common/paths.py b/runtime/ops/mapper/_video_common/paths.py new file mode 100644 index 00000000..89591302 --- /dev/null +++ b/runtime/ops/mapper/_video_common/paths.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +import os +import time +import uuid + +def ensure_dir(p: str): + os.makedirs(p, exist_ok=True) + return p + +def make_run_dir(export_path: str, op_name: str): + """ + 统一输出目录:{export_path}/{op_name}/{timestamp_uuid}/ + """ + ts = time.strftime("%Y%m%d_%H%M%S") + run_id = f"{ts}_{uuid.uuid4().hex[:8]}" + out_dir = os.path.join(export_path, op_name, run_id) + ensure_dir(out_dir) + return out_dir \ No newline at end of file diff --git a/runtime/ops/mapper/_video_common/qwen_http_client.py b/runtime/ops/mapper/_video_common/qwen_http_client.py new file mode 100644 index 00000000..b6640f69 --- /dev/null +++ b/runtime/ops/mapper/_video_common/qwen_http_client.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +import os +import json +import cv2 +import requests + +def qwenvl_infer_by_image_path( + image_path: str, + task: str, + service_url: str = "http://127.0.0.1:18080", + max_new_tokens: int = 64, + language: str = "zh", + style: str = "normal", + timeout: int = 180, +): + """ + 对齐你当前服务端 qwen_vl_server.py 的接口: + POST {service_url}/infer + JSON: {image_path, task, max_new_tokens, language, style} + + 返回:服务端 jsonify 的 dict + """ + sess = requests.Session() + sess.trust_env = False # 避免系统代理拦 localhost + + payload = { + "image_path": image_path, + "task": task, + "max_new_tokens": int(max_new_tokens), + "language": language, + "style": style, + } + r = sess.post(service_url.rstrip("/") + "/infer", json=payload, timeout=timeout) + r.raise_for_status() + return r.json() + +def save_frame_to_jpg(frame_bgr, out_path: str): + os.makedirs(os.path.dirname(out_path), exist_ok=True) + ok = cv2.imwrite(out_path, frame_bgr) + if not ok: + raise RuntimeError(f"failed to write jpg: {out_path}") + return out_path \ No newline at end of file diff --git a/runtime/ops/mapper/_video_common/schema.py b/runtime/ops/mapper/_video_common/schema.py new file mode 100644 index 00000000..d566359e --- /dev/null +++ b/runtime/ops/mapper/_video_common/schema.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +def init_tracks_schema(video_path, fps, width, height): + return { + "video": video_path, + "fps": float(fps), + "width": int(width), + "height": int(height), + "frames": [] # {"frame_id": i, "objects":[{"track_id":..,"bbox":[..],...}]} + } \ No newline at end of file diff --git a/runtime/ops/mapper/video_audio_extract/__init__.py b/runtime/ops/mapper/video_audio_extract/__init__.py new file mode 100644 index 00000000..674e260e --- /dev/null +++ b/runtime/ops/mapper/video_audio_extract/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoAudioExtract", + module_path="ops.mapper.video_audio_extract.process", +) diff --git a/runtime/ops/mapper/video_audio_extract/metadata.yml b/runtime/ops/mapper/video_audio_extract/metadata.yml new file mode 100644 index 00000000..6486b513 --- /dev/null +++ b/runtime/ops/mapper/video_audio_extract/metadata.yml @@ -0,0 +1,16 @@ +name: '视频抽取音频' +name_en: 'Video Audio Extract' +description: '从视频中抽取音频,默认输出 wav(16k/mono);也可输出 aac,并生成音频信息 audio_info.json。' +description_en: 'Extract audio from video, default wav (16k/mono); can output aac; also generates audio_info.json.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoAudioExtract' +version: '1.0.0' +types: + - 'cleaning' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'audio' \ No newline at end of file diff --git a/runtime/ops/mapper/video_audio_extract/process.py b/runtime/ops/mapper/video_audio_extract/process.py new file mode 100644 index 00000000..08ff6b48 --- /dev/null +++ b/runtime/ops/mapper/video_audio_extract/process.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +import os +import json +import shutil +import subprocess + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger + + +class VideoAudioExtract: + """从视频提取音频(wav 16k mono) + + params: + - ffmpeg_path: str, optional + - sample_rate: int, default 16000 + - channels: int, default 1 + - out_format: wav|aac, default wav + + outputs: + - artifacts/audio.wav (or audio.aac) + - artifacts/audio_info.json + """ + + @staticmethod + def execute(sample, params): + video_path = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + + op_name = "video_audio_extract" + out_dir = make_run_dir(export_path, op_name) + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + + logger = get_logger(op_name, log_dir) + logger.info(f"video={video_path}") + logger.info(f"out_dir={out_dir}") + + ffmpeg_path = params.get("ffmpeg_path") or shutil.which("ffmpeg") + if not ffmpeg_path: + raise RuntimeError("ffmpeg not found. Please install ffmpeg or pass params.ffmpeg_path") + + sr = int(params.get("sample_rate", 16000)) + ch = int(params.get("channels", 1)) + out_format = (params.get("out_format", "wav") or "wav").lower() + + if out_format == "aac": + audio_path = os.path.join(art_dir, "audio.aac") + cmd = [ + ffmpeg_path, "-hide_banner", "-y", + "-i", video_path, + "-vn", + "-ac", str(ch), + "-ar", str(sr), + "-c:a", "aac", + audio_path + ] + else: + audio_path = os.path.join(art_dir, "audio.wav") + cmd = [ + ffmpeg_path, "-hide_banner", "-y", + "-i", video_path, + "-vn", + "-ac", str(ch), + "-ar", str(sr), + "-c:a", "pcm_s16le", + audio_path + ] + + logger.info("FFmpeg cmd: " + " ".join(cmd)) + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p.returncode != 0: + raise RuntimeError(f"FFmpeg failed (code={p.returncode}).\nSTDERR:\n{p.stderr}") + + info = {"audio_path": audio_path, "sample_rate": sr, "channels": ch, "format": out_format} + info_path = os.path.join(art_dir, "audio_info.json") + with open(info_path, "w", encoding="utf-8") as f: + json.dump(info, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. audio={audio_path}") + return {"out_dir": out_dir, "audio_path": audio_path, "audio_info": info_path} \ No newline at end of file diff --git a/runtime/ops/mapper/video_classify_qwenvl/__init__.py b/runtime/ops/mapper/video_classify_qwenvl/__init__.py new file mode 100644 index 00000000..1a47cd17 --- /dev/null +++ b/runtime/ops/mapper/video_classify_qwenvl/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoClassifyQwenVL", + module_path="ops.mapper.video_classify_qwenvl.process", +) diff --git a/runtime/ops/mapper/video_classify_qwenvl/metadata.yml b/runtime/ops/mapper/video_classify_qwenvl/metadata.yml new file mode 100644 index 00000000..1f27ca1a --- /dev/null +++ b/runtime/ops/mapper/video_classify_qwenvl/metadata.yml @@ -0,0 +1,16 @@ +name: '视频分类(QwenVL)' +name_en: 'Video Classify (QwenVL)' +description: '抽帧调用 QwenVL classify25,多帧投票输出分类结果 classification.json。' +description_en: 'Sample frames and call QwenVL classify25; vote to output classification.json.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoClassifyQwenVL' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'text' \ No newline at end of file diff --git a/runtime/ops/mapper/video_classify_qwenvl/process.py b/runtime/ops/mapper/video_classify_qwenvl/process.py new file mode 100644 index 00000000..48a0a608 --- /dev/null +++ b/runtime/ops/mapper/video_classify_qwenvl/process.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +import os +import json +import collections +import cv2 + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger +from .._video_common.io_video import get_video_info +from .._video_common.qwen_http_client import qwenvl_infer_by_image_path, save_frame_to_jpg + + +def _sample_frame_indices(total_frames: int, fps: float, sample_fps: float, max_frames: int): + if total_frames <= 0: + return [] + fps = float(fps) if fps else 25.0 + step = max(1, int(round(fps / max(float(sample_fps), 1e-6)))) + idxs = list(range(0, total_frames, step)) + if max_frames and len(idxs) > int(max_frames): + n = int(max_frames) + idxs = [idxs[int(i * (len(idxs) - 1) / max(1, n - 1))] for i in range(n)] + return idxs + + +class VideoClassifyQwenVL: + """ + 抽帧 + QwenVL HTTP 分类(对齐服务端 task=classify25): + 返回: {class_id, class_name, raw} + + params: + - service_url: 默认 http://127.0.0.1:18080 + - timeout_sec: 默认 180 + - sample_fps: 默认 1.0 + - max_frames: 默认 12 + - return_topk: 默认 3 + - max_new_tokens: 默认 16 + outputs: + - artifacts/classification.json + """ + + def execute(self, sample, params=None): + params = params or {} + video_path = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + + out_dir = make_run_dir(export_path, "video_classify_qwenvl") + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + frames_dir = ensure_dir(os.path.join(art_dir, "frames")) + logger = get_logger("VideoClassifyQwenVL", log_dir) + + service_url = params.get("service_url", "http://127.0.0.1:18080") + timeout_sec = int(params.get("timeout_sec", 180)) + sample_fps = float(params.get("sample_fps", 1.0)) + max_frames = int(params.get("max_frames", 12)) + return_topk = int(params.get("return_topk", 3)) + max_new_tokens = int(params.get("max_new_tokens", 16)) + + fps, W, H, total_frames = get_video_info(video_path) + idxs = _sample_frame_indices(total_frames, fps, sample_fps, max_frames) + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open video: {video_path}") + + votes = collections.Counter() + evidence = [] + + for idx in idxs: + cap.set(cv2.CAP_PROP_POS_FRAMES, idx) + ok, frame = cap.read() + if not ok: + continue + + frame_jpg = os.path.join(frames_dir, f"{idx:06d}.jpg") + save_frame_to_jpg(frame, frame_jpg) + + try: + res = qwenvl_infer_by_image_path( + image_path=frame_jpg, + task="classify25", + service_url=service_url, + max_new_tokens=max_new_tokens, + timeout=timeout_sec, + ) + except Exception as e: + logger.error(f"classify infer failed frame={idx}: {repr(e)}") + continue + + class_name = (res.get("class_name") or "其他").strip() + class_id = int(res.get("class_id", 25)) + votes[class_name] += 1 + evidence.append({"frame_idx": idx, "image_path": frame_jpg, "class_id": class_id, "class_name": class_name}) + + cap.release() + + topk = [{"label": k, "vote": int(v)} for k, v in votes.most_common(return_topk)] + top1 = topk[0]["label"] if topk else "其他" + + result = { + "top1": top1, + "topk": topk, + "service_url": service_url, + "sample_fps": sample_fps, + "max_frames": max_frames, + "evidence": evidence, + } + + json_path = os.path.join(art_dir, "classification.json") + with open(json_path, "w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. classification_json={json_path}, top1={top1}") + return {"out_dir": out_dir, "classification_json": json_path, "top1": top1} \ No newline at end of file diff --git a/runtime/ops/mapper/video_deborder_crop/__init__.py b/runtime/ops/mapper/video_deborder_crop/__init__.py new file mode 100644 index 00000000..c8f8e4d0 --- /dev/null +++ b/runtime/ops/mapper/video_deborder_crop/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoDeborderCrop", + module_path="ops.mapper.video_deborder_crop.process", +) diff --git a/runtime/ops/mapper/video_deborder_crop/metadata.yml b/runtime/ops/mapper/video_deborder_crop/metadata.yml new file mode 100644 index 00000000..71ac6ea2 --- /dev/null +++ b/runtime/ops/mapper/video_deborder_crop/metadata.yml @@ -0,0 +1,16 @@ +name: '视频去黑边裁剪' +name_en: 'Video Deborder Crop' +description: '使用 ffmpeg cropdetect 自动检测黑边并裁剪输出 deborder.mp4;也支持 force_crop 指定裁剪框。' +description_en: 'Detect black borders via ffmpeg cropdetect and crop to output deborder.mp4; supports force_crop.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoDeborderCrop' +version: '1.0.0' +types: + - 'cleaning' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'video' \ No newline at end of file diff --git a/runtime/ops/mapper/video_deborder_crop/process.py b/runtime/ops/mapper/video_deborder_crop/process.py new file mode 100644 index 00000000..02e00924 --- /dev/null +++ b/runtime/ops/mapper/video_deborder_crop/process.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +import os +import re +import json +import shutil +import subprocess +from dataclasses import dataclass +from typing import List, Optional, Tuple + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger + + +@dataclass +class CropBox: + w: int + h: int + x: int + y: int + + def to_str(self) -> str: + return f"{self.w}:{self.h}:{self.x}:{self.y}" + + +def _even(x: int) -> int: + return x - (x % 2) + + +def _parse_cropdetect(stderr: str) -> List[CropBox]: + # ffmpeg cropdetect logs like: "crop=iw:ih:x:y" or "crop=1920:800:0:140" + boxes = [] + for line in stderr.splitlines(): + m = re.search(r"crop=(\d+):(\d+):(\d+):(\d+)", line) + if m: + w, h, x, y = map(int, m.groups()) + boxes.append(CropBox(w=w, h=h, x=x, y=y)) + return boxes + + +def _pick_box(boxes: List[CropBox], mode: str = "safe_keep_more") -> Optional[CropBox]: + """ + mode: + - safe_keep_more: 尽量少裁(更保守,避免误裁内容)=> 取 w/h 最大 + x/y 最小 + - aggressive_remove_more: 尽量多裁黑边 => 取 w/h 最小 + x/y 最大 + - median: 取中位数 + """ + if not boxes: + return None + + ws = sorted(b.w for b in boxes) + hs = sorted(b.h for b in boxes) + xs = sorted(b.x for b in boxes) + ys = sorted(b.y for b in boxes) + + if mode == "aggressive_remove_more": + w, h, x, y = min(ws), min(hs), max(xs), max(ys) + elif mode == "median": + mid = len(ws) // 2 + w, h, x, y = ws[mid], hs[mid], xs[mid], ys[mid] + else: + # 默认:尽量少裁,避免裁掉内容 + w, h, x, y = max(ws), max(hs), min(xs), min(ys) + + # crop 参数通常要求偶数(编码器/像素格式更兼容) + return CropBox(w=_even(w), h=_even(h), x=_even(x), y=_even(y)) + + +def detect_crop_box( + ffmpeg_path: str, + video_path: str, + sample_points: List[Tuple[float, float]], + cropdetect: str, + logger, +) -> Optional[CropBox]: + """在多个时间点探测 crop,汇总后给出一个 crop box。""" + all_boxes: List[CropBox] = [] + for (ss, dur) in sample_points: + cmd = [ + ffmpeg_path, "-hide_banner", "-y", + "-ss", f"{ss}", + "-i", video_path, + "-t", f"{dur}", + "-vf", cropdetect, + "-f", "null", "-" + ] + logger.info("cropdetect cmd: " + " ".join(cmd)) + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + # cropdetect 输出在 stderr;即使 returncode!=0 也可能有输出,所以不直接失败 + boxes = _parse_cropdetect(p.stderr) + if boxes: + # 取该段最后一个(通常更稳定) + all_boxes.append(boxes[-1]) + + # 汇总选择一个 box(默认保守:少裁) + return _pick_box(all_boxes, mode="safe_keep_more") + + +def crop_video( + ffmpeg_path: str, + video_path: str, + out_path: str, + crop: CropBox, + logger, + crf: int = 23, + preset: str = "veryfast", + audio_copy: bool = True, +): + # 裁剪会改变尺寸,必须重新编码视频;音频可以 copy + cmd = [ + ffmpeg_path, "-hide_banner", "-y", + "-i", video_path, + "-vf", f"crop={crop.to_str()}", + "-c:v", "libx264", + "-preset", preset, + "-crf", str(crf), + "-pix_fmt", "yuv420p", + ] + if audio_copy: + cmd += ["-c:a", "copy"] + else: + cmd += ["-c:a", "aac", "-b:a", "128k"] + + cmd += [out_path] + + logger.info("crop cmd: " + " ".join(cmd)) + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p.returncode != 0: + raise RuntimeError(f"ffmpeg crop failed (code={p.returncode}).\nSTDERR:\n{p.stderr}") + + +class VideoDeborderCrop: + """去黑边(自动 cropdetect + crop) + + params: + - ffmpeg_path: str, optional + - cropdetect: str, default "cropdetect=24:16:0" + - sample_points: list, optional + 默认会采样 [(0,2),(5,2)];如果视频很短也没关系 + - force_crop: str, optional # 直接指定 "w:h:x:y" + - crf: int, default 23 + - preset: str, default "veryfast" + - audio_copy: bool, default True + + outputs: + - artifacts/deborder.mp4 + - artifacts/crop_params.json + """ + + @staticmethod + def execute(sample, params): + video_path = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + + op_name = "video_deborder_crop" + out_dir = make_run_dir(export_path, op_name) + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + logger = get_logger(op_name, log_dir) + + logger.info(f"video={video_path}") + logger.info(f"out_dir={out_dir}") + + ffmpeg_path = params.get("ffmpeg_path") or shutil.which("ffmpeg") + if not ffmpeg_path: + raise RuntimeError("ffmpeg not found. Please install ffmpeg or pass params.ffmpeg_path") + + cropdetect = params.get("cropdetect", "cropdetect=24:16:0") + force_crop = params.get("force_crop", None) + crf = int(params.get("crf", 23)) + preset = params.get("preset", "veryfast") + audio_copy = bool(params.get("audio_copy", True)) + + # 默认采样点:开头 2s + 5s 处 2s + sample_points = params.get("sample_points", None) + if not sample_points: + sample_points = [(0.0, 2.0), (5.0, 2.0)] + + crop_box: Optional[CropBox] = None + if force_crop: + m = re.match(r"(\d+):(\d+):(\d+):(\d+)", str(force_crop)) + if not m: + raise ValueError('force_crop must be "w:h:x:y"') + w, h, x, y = map(int, m.groups()) + crop_box = CropBox(w=_even(w), h=_even(h), x=_even(x), y=_even(y)) + else: + crop_box = detect_crop_box(ffmpeg_path, video_path, sample_points, cropdetect, logger) + + if not crop_box: + # 探测不到就原样输出(不裁剪) + logger.warning("cropdetect found nothing, keep original video.") + crop_box = CropBox(w=0, h=0, x=0, y=0) + + out_mp4 = os.path.join(art_dir, "deborder.mp4") + crop_json = os.path.join(art_dir, "crop_params.json") + + if crop_box.w == 0 or crop_box.h == 0: + # 直接复制(不裁) + cmd = [ffmpeg_path, "-hide_banner", "-y", "-i", video_path, "-c", "copy", out_mp4] + logger.info("copy cmd: " + " ".join(cmd)) + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p.returncode != 0: + raise RuntimeError(f"ffmpeg copy failed (code={p.returncode}).\nSTDERR:\n{p.stderr}") + info = {"mode": "copy", "crop": None, "out_mp4": out_mp4} + else: + crop_video(ffmpeg_path, video_path, out_mp4, crop_box, logger, crf=crf, preset=preset, audio_copy=audio_copy) + info = {"mode": "crop", "crop": crop_box.__dict__, "out_mp4": out_mp4} + + with open(crop_json, "w", encoding="utf-8") as f: + json.dump(info, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. deborder_mp4={out_mp4}") + return {"out_dir": out_dir, "deborder_mp4": out_mp4, "crop_params_json": crop_json} \ No newline at end of file diff --git a/runtime/ops/mapper/video_event_tag_qwenvl/__init__.py b/runtime/ops/mapper/video_event_tag_qwenvl/__init__.py new file mode 100644 index 00000000..5ac7d0d8 --- /dev/null +++ b/runtime/ops/mapper/video_event_tag_qwenvl/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoEventTagQwenVL", + module_path="ops.mapper.video_event_tag_qwenvl.process", +) diff --git a/runtime/ops/mapper/video_event_tag_qwenvl/metadata.yml b/runtime/ops/mapper/video_event_tag_qwenvl/metadata.yml new file mode 100644 index 00000000..56d797f7 --- /dev/null +++ b/runtime/ops/mapper/video_event_tag_qwenvl/metadata.yml @@ -0,0 +1,16 @@ +name: '事件标注(QwenVL)' +name_en: 'Video Event Tag (QwenVL)' +description: '自适应分段取每段中点帧调用 QwenVL event_tag,输出 events.json。' +description_en: 'Adaptive segmentation; call QwenVL event_tag on mid-frame of each segment; outputs events.json.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoEventTagQwenVL' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'text' \ No newline at end of file diff --git a/runtime/ops/mapper/video_event_tag_qwenvl/process.py b/runtime/ops/mapper/video_event_tag_qwenvl/process.py new file mode 100644 index 00000000..43974f27 --- /dev/null +++ b/runtime/ops/mapper/video_event_tag_qwenvl/process.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +import os +import json +import cv2 + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger +from .._video_common.io_video import get_video_info +from .._video_common.qwen_http_client import qwenvl_infer_by_image_path, save_frame_to_jpg + + +def _make_segments(duration_sec: float, params: dict): + adaptive = bool(params.get("adaptive_segment", True)) + max_segments = int(params.get("max_segments", 60)) + max_new_tokens = int(params.get("max_new_tokens", 32)) + + if duration_sec <= 0: + return [(0.0, 0.0)] + + if not adaptive: + seg_len = float(params.get("segment_seconds", 5.0)) + else: + target = int(params.get("target_segments", 12)) + min_seg = float(params.get("min_segment_seconds", 2.0)) + max_seg = float(params.get("max_segment_seconds", 60.0)) + seg_len = duration_sec / max(1, target) + seg_len = max(min_seg, min(max_seg, seg_len)) + + segs = [] + s = 0.0 + while s < duration_sec and len(segs) < max_segments: + e = min(duration_sec, s + seg_len) + segs.append((s, e)) + s = e + return segs + + +class VideoEventTagQwenVL: + """ + 分段取中点帧 → QwenVL HTTP 事件标注(对齐服务端 task=event_tag): + 返回: {event} + + params: + - service_url: 默认 http://127.0.0.1:18080 + - timeout_sec: 默认 180 + - adaptive_segment: 默认 True + - target_segments: 默认 12 + - min_segment_seconds: 默认 2.0 + - max_segment_seconds: 默认 60.0 + - segment_seconds: 默认 5.0(当 adaptive_segment=false 时) + - max_segments: 默认 60 + - max_new_tokens: 默认 32 + outputs: + - artifacts/events.json + - artifacts/frames/*.jpg + """ + + def execute(self, sample, params=None): + params = params or {} + video_path = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + + out_dir = make_run_dir(export_path, "video_event_tag_qwenvl") + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + frames_dir = ensure_dir(os.path.join(art_dir, "frames")) + logger = get_logger("VideoEventTagQwenVL", log_dir) + + service_url = params.get("service_url", "http://127.0.0.1:18080") + timeout_sec = int(params.get("timeout_sec", 180)) + max_new_tokens = int(params.get("max_new_tokens", 32)) + + fps, W, H, total_frames = get_video_info(video_path) + duration_sec = (float(total_frames) / float(fps)) if fps else 0.0 + segs = _make_segments(duration_sec, params) + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open video: {video_path}") + + events = [] + for i, (s, e) in enumerate(segs): + mid = (s + e) / 2.0 + mid_frame = int(round(mid * float(fps))) if fps else 0 + cap.set(cv2.CAP_PROP_POS_FRAMES, mid_frame) + ok, frame = cap.read() + if not ok: + continue + + frame_jpg = os.path.join(frames_dir, f"seg_{i:04d}_mid_{mid_frame:06d}.jpg") + save_frame_to_jpg(frame, frame_jpg) + + try: + res = qwenvl_infer_by_image_path( + image_path=frame_jpg, + task="event_tag", + service_url=service_url, + max_new_tokens=max_new_tokens, + timeout=timeout_sec, + ) + event = (res.get("event") or "").strip() + except Exception as ex: + logger.error(f"event_tag infer failed seg={i} mid={mid:.2f}: {repr(ex)}") + event = "" + + events.append( + { + "seg_id": i, + "start": float(s), + "end": float(e), + "mid": float(mid), + "mid_frame": int(mid_frame), + "image_path": frame_jpg, + "event": event, + } + ) + + cap.release() + + out_json = os.path.join(art_dir, "events.json") + with open(out_json, "w", encoding="utf-8") as f: + json.dump( + { + "video": video_path, + "service_url": service_url, + "duration_sec": duration_sec, + "segments": events, + }, + f, + ensure_ascii=False, + indent=2, + ) + + logger.info(f"Done. events_json={out_json}, segments={len(events)}") + return {"out_dir": out_dir, "events_json": out_json, "segments_count": len(events)} \ No newline at end of file diff --git a/runtime/ops/mapper/video_format_convert/__init__.py b/runtime/ops/mapper/video_format_convert/__init__.py new file mode 100644 index 00000000..62f6d450 --- /dev/null +++ b/runtime/ops/mapper/video_format_convert/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoFormatConvert", + module_path="ops.mapper.video_format_convert.process", +) diff --git a/runtime/ops/mapper/video_format_convert/metadata.yml b/runtime/ops/mapper/video_format_convert/metadata.yml new file mode 100644 index 00000000..ed1e3f2a --- /dev/null +++ b/runtime/ops/mapper/video_format_convert/metadata.yml @@ -0,0 +1,16 @@ +name: '视频格式转换' +name_en: 'Video Format Convert' +description: '仅做容器格式转换(stream copy,不重编码);输出 converted.xxx 与 convert_result.json。' +description_en: 'Container remux via ffmpeg stream copy (no re-encode); outputs converted.xxx and convert_result.json.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoFormatConvert' +version: '1.0.0' +types: + - 'cleaning' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'video' \ No newline at end of file diff --git a/runtime/ops/mapper/video_format_convert/process.py b/runtime/ops/mapper/video_format_convert/process.py new file mode 100644 index 00000000..3a4ce552 --- /dev/null +++ b/runtime/ops/mapper/video_format_convert/process.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +import os +import json + +from .._video_common.paths import make_run_dir +from .._video_common.log import get_logger +from .._video_common.ffmpeg import run_cmd + + +class VideoFormatConvert: + """ + 仅做“容器格式转换”(不重编码): + - 通过 ffmpeg stream copy 实现:-c:v copy -c:a copy + - 输出文件后缀决定目标容器格式:mp4/mkv/mov/avi/wmv... + + 输入: + sample["filePath"] + sample["export_path"] + + params: + - container: 目标容器后缀(默认 "mp4") + - out_name: 输出文件名(默认 "converted.{container}") + - copy_video: 是否 copy 视频流(默认 True) + - copy_audio: 是否 copy 音频流(默认 True) + - extra_args: 额外 ffmpeg 参数列表(可选) + + 输出: + out_dir/converted.xxx + out_dir/convert_result.json + out_dir/run.log + """ + + def execute(self, sample: dict, params: dict = None): + params = params or {} + in_path = sample["filePath"] + export_path = sample["export_path"] + + out_dir = make_run_dir(export_path, "video_format_convert") + logger = get_logger("VideoFormatConvert", log_dir=out_dir) + + # 目标容器 + container = str(params.get("container", "mp4")).lstrip(".").lower() + out_name = params.get("out_name", f"converted.{container}") + if not out_name.lower().endswith(f".{container}"): + # 防止用户给了不匹配的后缀 + out_name = f"{out_name}.{container}" + out_video = os.path.join(out_dir, out_name) + + copy_video = bool(params.get("copy_video", True)) + copy_audio = bool(params.get("copy_audio", True)) + extra_args = params.get("extra_args", None) # list[str] or None + + logger.info(f"Start container convert (stream copy). in={in_path}, out={out_video}, container={container}") + + cmd = ["ffmpeg", "-y", "-i", in_path] + + # 视频流 + cmd += ["-c:v", "copy" if copy_video else "libx264"] + # 音频流 + cmd += ["-c:a", "copy" if copy_audio else "aac"] + + # 如果用户传了额外参数(例如 -map 0、-movflags +faststart 等) + if extra_args: + if not isinstance(extra_args, list): + raise ValueError("params['extra_args'] must be a list, e.g. ['-movflags', '+faststart']") + cmd += extra_args + + cmd += [out_video] + + try: + run_cmd(cmd, logger=logger) + except Exception as e: + # 给更明确的提示:某些容器不支持某些编码,copy 会失败 + logger.error("Container convert failed. This is usually due to codec/container incompatibility when using stream copy.") + logger.error("You can either choose a different container, or enable re-encode (copy_video/copy_audio=False).") + raise + + result = { + "out_dir": out_dir, + "input": in_path, + "output_video": out_video, + "mode": "stream_copy", + "params": { + "container": container, + "out_name": out_name, + "copy_video": copy_video, + "copy_audio": copy_audio, + "extra_args": extra_args, + }, + } + + json_path = os.path.join(out_dir, "convert_result.json") + with open(json_path, "w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. output={out_video}") + return result \ No newline at end of file diff --git a/runtime/ops/mapper/video_keyframe_extract/__init__.py b/runtime/ops/mapper/video_keyframe_extract/__init__.py new file mode 100644 index 00000000..be2c3c37 --- /dev/null +++ b/runtime/ops/mapper/video_keyframe_extract/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoKeyframeExtract", + module_path="ops.mapper.video_keyframe_extract.process", +) diff --git a/runtime/ops/mapper/video_keyframe_extract/metadata.yml b/runtime/ops/mapper/video_keyframe_extract/metadata.yml new file mode 100644 index 00000000..2779b28e --- /dev/null +++ b/runtime/ops/mapper/video_keyframe_extract/metadata.yml @@ -0,0 +1,16 @@ +name: '关键帧提取' +name_en: 'Video Keyframe Extract' +description: '基于 ffmpeg scene detect 提取关键帧,并可补封面帧(t=0),输出 keyframes.json 与关键帧图片。' +description_en: 'Extract keyframes via ffmpeg scene detect and optional cover frame (t=0); outputs keyframes.json and images.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoKeyframeExtract' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'image' \ No newline at end of file diff --git a/runtime/ops/mapper/video_keyframe_extract/process.py b/runtime/ops/mapper/video_keyframe_extract/process.py new file mode 100644 index 00000000..b2b79551 --- /dev/null +++ b/runtime/ops/mapper/video_keyframe_extract/process.py @@ -0,0 +1,229 @@ +import os +import json +import shutil +import subprocess +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + + +def _run(cmd: List[str]) -> Tuple[int, str]: + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + return p.returncode, (p.stderr or "") + (p.stdout or "") + + +def _ensure_dir(p: str): + os.makedirs(p, exist_ok=True) + + +def _list_jpgs(d: str) -> List[str]: + if not os.path.isdir(d): + return [] + xs = [os.path.join(d, x) for x in os.listdir(d) if x.lower().endswith(".jpg")] + xs.sort() + return xs + + +def _probe_duration(ffprobe_path: str, video_path: str) -> float: + # 尽量不用任何第三方库,直接 ffprobe + cmd = [ + ffprobe_path, "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + video_path + ] + rc, out = _run(cmd) + if rc != 0: + return 0.0 + try: + return float(out.strip().splitlines()[-1]) + except Exception: + return 0.0 + + +@dataclass +class KeyframeParams: + ffmpeg_path: str = "" + ffprobe_path: str = "" + scene_threshold: float = 0.3 + threshold_candidates: Optional[List[float]] = None + max_keyframes: int = 30 + min_interval_sec: float = 1.0 + always_include_first: bool = True + quality: int = 2 # -q:v + out_json_name: str = "keyframes.json" + + +class VideoKeyframeExtractLocal: + """ + 本地运行版:不依赖 datamate。 + 输出: + /artifacts/keyframes/cover.jpg (可选) + /artifacts/keyframes/%06d.jpg (scene 帧) + /artifacts/keyframes.json + """ + + def run(self, video_path: str, out_dir: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + p = KeyframeParams(**(params or {})) + + ffmpeg = p.ffmpeg_path or shutil.which("ffmpeg") + ffprobe = p.ffprobe_path or shutil.which("ffprobe") + if not ffmpeg: + raise RuntimeError("ffmpeg not found. Install ffmpeg or set ffmpeg_path.") + if not ffprobe: + raise RuntimeError("ffprobe not found. Install ffprobe or set ffprobe_path.") + + artifacts = os.path.join(out_dir, "artifacts") + key_dir = os.path.join(artifacts, "keyframes") + _ensure_dir(key_dir) + + duration = _probe_duration(ffprobe, video_path) + + outputs: List[Dict[str, Any]] = [] + + # 1) cover + cover_path = os.path.join(key_dir, "cover.jpg") + if p.always_include_first: + cmd = [ + ffmpeg, "-hide_banner", "-y", + "-ss", "0", + "-i", video_path, + "-frames:v", "1", + "-q:v", str(p.quality), + "-vf", "format=yuvj420p", + cover_path + ] + rc, log = _run(cmd) + if rc == 0 and os.path.exists(cover_path): + outputs.append({"kind": "cover", "time_sec": 0.0, "path": cover_path}) + else: + # cover 失败不致命 + pass + + # 2) scene keyframes + thr_candidates = p.threshold_candidates or [p.scene_threshold, 0.2, 0.15, 0.1, 0.06] + scene_files: List[str] = [] + used_thr: Optional[float] = None + + for thr in thr_candidates: + # 清掉旧的 scene 输出(保留 cover) + for f in _list_jpgs(key_dir): + if os.path.basename(f) != "cover.jpg": + try: + os.remove(f) + except Exception: + pass + + vf = f"select='gt(scene,{thr})',format=yuvj420p" + out_tpl = os.path.join(key_dir, "%06d.jpg") + + # 兼容新旧 ffmpeg + cmd = [ + ffmpeg, "-hide_banner", "-y", + "-i", video_path, + "-vf", vf, + "-q:v", str(p.quality), + "-frames:v", str(p.max_keyframes * 3), + "-fps_mode", "vfr", + out_tpl + ] + rc, log = _run(cmd) + if rc != 0 and "Unrecognized option 'fps_mode'" in log: + cmd = [ + ffmpeg, "-hide_banner", "-y", + "-i", video_path, + "-vf", vf, + "-q:v", str(p.quality), + "-frames:v", str(p.max_keyframes * 3), + "-vsync", "vfr", + out_tpl + ] + rc, log = _run(cmd) + + files = [f for f in _list_jpgs(key_dir) if os.path.basename(f) != "cover.jpg"] + if files: + scene_files = files + used_thr = thr + break + + # 3) fallback:scene=0 时取中间帧 + if not scene_files: + t = duration / 2.0 if duration > 0 else 0.0 + mid_path = os.path.join(key_dir, "000001.jpg") + cmd = [ + ffmpeg, "-hide_banner", "-y", + "-ss", f"{t}", + "-i", video_path, + "-frames:v", "1", + "-q:v", str(p.quality), + "-vf", "format=yuvj420p", + mid_path + ] + rc, log = _run(cmd) + if rc != 0 or (not os.path.exists(mid_path)): + raise RuntimeError(f"KeyframeExtractLocal failed: scene=0 and fallback midframe failed. log={log[-800:]}") + scene_files = [mid_path] + used_thr = None + + # 4) 时间间隔过滤 + 截断 max_keyframes + # 这里用“均匀估计”时间戳(不解析 showinfo),足够用于过滤过密 + if duration > 0 and len(scene_files) > 1: + kept: List[Tuple[float, str]] = [] + last_t = -1e9 + for i, f in enumerate(scene_files): + t = (i / max(1, (len(scene_files) - 1))) * duration + if t - last_t >= p.min_interval_sec: + kept.append((t, f)) + last_t = t + if len(kept) >= p.max_keyframes: + break + for t, f in kept: + outputs.append({"kind": "scene", "time_sec": float(t), "path": f}) + else: + for f in scene_files[:p.max_keyframes]: + outputs.append({"kind": "scene", "time_sec": None, "path": f}) + + out_json = os.path.join(artifacts, p.out_json_name) + payload = { + "input": video_path, + "out_dir": out_dir, + "scene_threshold": p.scene_threshold, + "used_scene_threshold": used_thr, + "max_keyframes": p.max_keyframes, + "min_interval_sec": p.min_interval_sec, + "always_include_first": p.always_include_first, + "keyframes": outputs, + } + with open(out_json, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + + return { + "out_dir": out_dir, + "keyframes_json": out_json, + "keyframes_dir": key_dir, + } + + +if __name__ == "__main__": + import argparse + + ap = argparse.ArgumentParser() + ap.add_argument("--video", required=True) + ap.add_argument("--out_dir", required=True) + ap.add_argument("--scene_threshold", type=float, default=0.15) + ap.add_argument("--max_keyframes", type=int, default=30) + ap.add_argument("--min_interval_sec", type=float, default=1.0) + ap.add_argument("--always_include_first", action="store_true") + args = ap.parse_args() + + runner = VideoKeyframeExtractLocal() + res = runner.run( + video_path=args.video, + out_dir=args.out_dir, + params={ + "scene_threshold": args.scene_threshold, + "max_keyframes": args.max_keyframes, + "min_interval_sec": args.min_interval_sec, + "always_include_first": bool(args.always_include_first), + }, + ) + print(json.dumps(res, ensure_ascii=False, indent=2)) \ No newline at end of file diff --git a/runtime/ops/mapper/video_mot_track/__init__.py b/runtime/ops/mapper/video_mot_track/__init__.py new file mode 100644 index 00000000..9d9954c8 --- /dev/null +++ b/runtime/ops/mapper/video_mot_track/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoMotTrack", + module_path="ops.mapper.video_mot_track.process", +) diff --git a/runtime/ops/mapper/video_mot_track/configs/bytetrack.yaml b/runtime/ops/mapper/video_mot_track/configs/bytetrack.yaml new file mode 100644 index 00000000..c8cdead2 --- /dev/null +++ b/runtime/ops/mapper/video_mot_track/configs/bytetrack.yaml @@ -0,0 +1,7 @@ +tracker_type: bytetrack +track_high_thresh: 0.25 +track_low_thresh: 0.1 +new_track_thresh: 0.25 +track_buffer: 30 +match_thresh: 0.8 +fuse_score: True \ No newline at end of file diff --git a/runtime/ops/mapper/video_mot_track/metadata.yml b/runtime/ops/mapper/video_mot_track/metadata.yml new file mode 100644 index 00000000..24c0ff52 --- /dev/null +++ b/runtime/ops/mapper/video_mot_track/metadata.yml @@ -0,0 +1,16 @@ +name: '多目标跟踪' +name_en: 'Video MOT Track' +description: '基于检测+跟踪生成轨迹文件 tracks.json,并输出 debug.mp4 用于可视化验收。' +description_en: 'Run detection+tracking to generate tracks.json and debug.mp4 for visualization.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoMotTrack' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'text' \ No newline at end of file diff --git a/runtime/ops/mapper/video_mot_track/process.py b/runtime/ops/mapper/video_mot_track/process.py new file mode 100644 index 00000000..b3a1010c --- /dev/null +++ b/runtime/ops/mapper/video_mot_track/process.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +import os +import json +import cv2 +import shutil + +from ultralytics import YOLO + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger +from .._video_common.io_video import get_video_info +from .._video_common.schema import init_tracks_schema +from .._video_common.model_paths import resolve_model_path + + +class VideoMotTrack: + """多目标跟踪(YOLO + ByteTrack) + + 权重策略(模型仓): + DATAMATE_MODEL_ROOT=/mnt/models + 默认权重:/mnt/models/yolo/yolov8n.pt + params: + - model_root: 可选,覆盖 DATAMATE_MODEL_ROOT + - yolo_model: 可选,权重路径(相对/绝对均可) + - conf: default 0.3 + - iou: default 0.5 + - classes: "0,2,3" or None + - tracker_cfg: bytetrack yaml 路径(默认算子 configs/bytetrack.yaml) + - save_debug: default True + outputs: + - tracks.json + - debug.mp4 (optional) + """ + + def execute(self, sample: dict, params: dict = None): + params = params or {} + video_path = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + + out_dir = make_run_dir(export_path, "video_mot_track") + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + logger = get_logger("VideoMotTrack", log_dir) + + # YOLO config dir(避免写到不可写目录) + os.environ.setdefault("YOLO_CONFIG_DIR", os.path.join(out_dir, "yolo_cfg")) + os.makedirs(os.environ["YOLO_CONFIG_DIR"], exist_ok=True) + + # ✅ 模型仓默认权重 + yolo_model = resolve_model_path(params, "yolo_model", "yolo/yolov8n.pt") + + conf = float(params.get("conf", 0.3)) + iou = float(params.get("iou", 0.5)) + classes = params.get("classes", None) # "0,2,3" or None + tracker_cfg = params.get("tracker_cfg", os.path.join(os.path.dirname(__file__), "configs/bytetrack.yaml")) + save_debug = bool(params.get("save_debug", True)) + + cls_list = None + if classes: + cls_list = [int(x.strip()) for x in str(classes).split(",") if x.strip() != ""] + + fps, W, H, _ = get_video_info(video_path) + tracks = init_tracks_schema(video_path, fps, W, H) + + debug_path = os.path.join(art_dir, "debug.mp4") + debug_writer = None + if save_debug: + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + debug_writer = cv2.VideoWriter(debug_path, fourcc, fps, (W, H)) + + logger.info(f"Start tracking. video={video_path}, model={yolo_model}, conf={conf}, iou={iou}, classes={classes}") + if not os.path.exists(yolo_model): + raise RuntimeError(f"YOLO weight not found: {yolo_model}. Please download to model repo path.") + + model = YOLO(yolo_model) + results_iter = model.track( + source=video_path, + conf=conf, + iou=iou, + classes=cls_list, + tracker=tracker_cfg, + stream=True, + verbose=False, + ) + + frame_idx = 0 + for r in results_iter: + frame = r.orig_img + objs = [] + if r.boxes is not None and r.boxes.id is not None: + ids = r.boxes.id.cpu().numpy().tolist() + xyxy = r.boxes.xyxy.cpu().numpy().tolist() + confs = r.boxes.conf.cpu().numpy().tolist() + clss = r.boxes.cls.cpu().numpy().tolist() + for tid, bb, sc, cc in zip(ids, xyxy, confs, clss): + x1, y1, x2, y2 = bb + objs.append({ + "track_id": int(tid), + "bbox": [float(x1), float(y1), float(x2), float(y2)], + "score": float(sc), + "cls_id": int(cc), + }) + if debug_writer is not None: + cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,0), 2) + cv2.putText(frame, f"id={int(tid)}", (int(x1), int(y1)-5), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2) + tracks["frames"].append({"frame_idx": frame_idx, "objects": objs}) + + if debug_writer is not None: + debug_writer.write(frame) + frame_idx += 1 + + if debug_writer is not None: + debug_writer.release() + + tracks_path = os.path.join(art_dir, "tracks.json") + with open(tracks_path, "w", encoding="utf-8") as f: + json.dump(tracks, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. tracks_json={tracks_path}") + out = {"out_dir": out_dir, "tracks_json": tracks_path} + if save_debug: + out["debug_mp4"] = debug_path + return out \ No newline at end of file diff --git a/runtime/ops/mapper/video_sensitive_crop/__init__.py b/runtime/ops/mapper/video_sensitive_crop/__init__.py new file mode 100644 index 00000000..ff8912df --- /dev/null +++ b/runtime/ops/mapper/video_sensitive_crop/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoSensitiveCrop", + module_path="ops.mapper.video_sensitive_crop.process", +) diff --git a/runtime/ops/mapper/video_sensitive_crop/metadata.yml b/runtime/ops/mapper/video_sensitive_crop/metadata.yml new file mode 100644 index 00000000..40398330 --- /dev/null +++ b/runtime/ops/mapper/video_sensitive_crop/metadata.yml @@ -0,0 +1,16 @@ +name: '视频敏感裁剪' +name_en: 'Video Sensitive Crop' +description: '根据敏感片段 JSON 裁剪/清洗输出 cleaned.mp4,并生成 crop_result.json。' +description_en: 'Crop/clean video based on sensitive segments JSON; outputs cleaned.mp4 and crop_result.json.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoSensitiveCrop' +version: '1.0.0' +types: + - 'cleaning' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'video' \ No newline at end of file diff --git a/runtime/ops/mapper/video_sensitive_crop/process.py b/runtime/ops/mapper/video_sensitive_crop/process.py new file mode 100644 index 00000000..154c1f37 --- /dev/null +++ b/runtime/ops/mapper/video_sensitive_crop/process.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +import os +import json + +from .._video_common.paths import make_run_dir +from .._video_common.log import get_logger +from .._video_common.io_video import get_video_info +from .._video_common.ffmpeg import cut_segment, concat_segments +from ..video_sensitive_detect.process import VideoSensitiveDetect + + +def complement_intervals(segments, duration): + if not segments: + return [[0.0, duration]] + + segs = sorted([(float(x["start"]), float(x["end"])) for x in segments], key=lambda x: x[0]) + + # merge + merged = [] + cs, ce = segs[0] + for s, e in segs[1:]: + if s <= ce: + ce = max(ce, e) + else: + merged.append([cs, ce]) + cs, ce = s, e + merged.append([cs, ce]) + + keep = [] + prev = 0.0 + for s, e in merged: + s = max(0.0, min(duration, s)) + e = max(0.0, min(duration, e)) + if s > prev: + keep.append([prev, s]) + prev = max(prev, e) + if prev < duration: + keep.append([prev, duration]) + + return [[s, e] for s, e in keep if e - s >= 0.05] + + +class VideoSensitiveCrop: + """ + 敏感裁剪:默认 remove(剔除敏感段) + params: + - segments_json: 必填(video_sensitive_detect 输出) + - keep_mode: "remove" 或 "keep"(默认 remove) + - out_name: 默认 cleaned.mp4 + """ + def execute(self, sample: dict, params: dict = None): + params = params or {} + video_path = sample["filePath"] + export_path = sample["export_path"] + + out_dir = make_run_dir(export_path, "video_sensitive_crop") + logger = get_logger("VideoSensitiveCrop", log_dir=out_dir) + + + segments_json = params.get("segments_json", "") + # 如果没传 segments_json,就自动先跑 VideoSensitiveDetect 生成 + if (not segments_json) or (not os.path.exists(segments_json)): + # detect_params 优先从 params["detect_params"] 读取;否则从当前 params 里抽取 detect 所需字段 + detect_params = params.get("detect_params", None) + if detect_params is None: + detect_keys = ["qwen_module", "qwen_func", "sample_fps", "threshold", "merge_gap", "prompt"] + detect_params = {k: params[k] for k in detect_keys if k in params} + + # VideoSensitiveDetect 里 qwen_module 是必填的;没给就明确报错(避免你后面裁剪时不知道为什么没生成) + if "qwen_module" not in detect_params: + raise RuntimeError( + "VideoSensitiveCrop: segments_json not provided, and detect_params missing required 'qwen_module'. " + "Please pass params['qwen_module'] (and optional qwen_func/sample_fps/threshold/merge_gap)." + ) + + logger.info("segments_json not provided; run VideoSensitiveDetect first to generate sensitive_segments.json") + det_out = VideoSensitiveDetect().execute(sample, detect_params) + + # 兼容不同返回 key:尽量从 det_out 中找出 json 路径 + for key in [ + "segments_json", + "sensitive_segments_json", + "sensitive_segments_path", + "json_path", + "output_json", + ]: + if key in det_out and det_out[key] and os.path.exists(det_out[key]): + segments_json = det_out[key] + break + + # 如果 detect 没把路径通过 return 带出来,就回退到 out_dir 默认文件名(你的 detect 默认写 sensitive_segments.json) + if (not segments_json) or (not os.path.exists(segments_json)): + fallback = os.path.join(out_dir, "sensitive_segments.json") + if os.path.exists(fallback): + segments_json = fallback + + if (not segments_json) or (not os.path.exists(segments_json)): + raise RuntimeError("VideoSensitiveCrop: failed to obtain sensitive segments json from detect step.") + + + + + keep_mode = params.get("keep_mode", "remove") + out_name = params.get("out_name", "cleaned.mp4") + out_video = os.path.join(out_dir, out_name) + + det = json.load(open(segments_json, "r", encoding="utf-8")) + segments = det.get("segments", []) + + fps, W, H, nframes = get_video_info(video_path) + duration = nframes / float(fps) if fps > 0 else 0.0 + + if keep_mode == "remove": + keep_intervals = complement_intervals(segments, duration) + elif keep_mode == "keep": + keep_intervals = [[float(x["start"]), float(x["end"])] for x in segments] + else: + raise ValueError("keep_mode must be 'remove' or 'keep'") + + logger.info(f"Start crop. mode={keep_mode}, keep_intervals={len(keep_intervals)}, duration={duration:.2f}s") + + if not keep_intervals: + logger.info("No intervals to keep. Copy original as output.") + cut_segment(video_path, out_video, 0.0, duration, logger=logger) + else: + seg_dir = os.path.join(out_dir, "segments") + os.makedirs(seg_dir, exist_ok=True) + + seg_files = [] + for i, (s, e) in enumerate(keep_intervals): + seg_path = os.path.join(seg_dir, f"seg_{i:04d}.mp4") + cut_segment(video_path, seg_path, s, e, logger=logger) + seg_files.append(seg_path) + + concat_segments(seg_files, out_video, logger=logger) + + result = { + "out_dir": out_dir, + "input": video_path, + "segments_json": segments_json, + "keep_mode": keep_mode, + "output_video": out_video, + "kept_intervals": keep_intervals, + } + json_path = os.path.join(out_dir, "crop_result.json") + with open(json_path, "w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. output={out_video}") + return result \ No newline at end of file diff --git a/runtime/ops/mapper/video_sensitive_detect/__init__.py b/runtime/ops/mapper/video_sensitive_detect/__init__.py new file mode 100644 index 00000000..c03c3c42 --- /dev/null +++ b/runtime/ops/mapper/video_sensitive_detect/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoSensitiveDetect", + module_path="ops.mapper.video_sensitive_detect.process", +) diff --git a/runtime/ops/mapper/video_sensitive_detect/metadata.yml b/runtime/ops/mapper/video_sensitive_detect/metadata.yml new file mode 100644 index 00000000..7f51a044 --- /dev/null +++ b/runtime/ops/mapper/video_sensitive_detect/metadata.yml @@ -0,0 +1,16 @@ +name: '视频敏感检测' +name_en: 'Video Sensitive Detect' +description: '抽帧+Qwen判定生成敏感片段 sensitive_segments.json(需要提供 qwen_module/qwen_func)。' +description_en: 'Sample frames and call Qwen inference to generate sensitive_segments.json (requires qwen_module/qwen_func).' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoSensitiveDetect' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'text' \ No newline at end of file diff --git a/runtime/ops/mapper/video_sensitive_detect/process.py b/runtime/ops/mapper/video_sensitive_detect/process.py new file mode 100644 index 00000000..a5707065 --- /dev/null +++ b/runtime/ops/mapper/video_sensitive_detect/process.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +import os +import json +import cv2 + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger +from .._video_common.io_video import get_video_info +from .._video_common.qwen_http_client import qwenvl_infer_by_image_path, save_frame_to_jpg + + +def _merge_times_to_segments(times, gap=1.5, pad=0.5): + if not times: + return [] + times = sorted(times) + segs = [] + s = times[0] + prev = times[0] + for t in times[1:]: + if t - prev <= gap: + prev = t + else: + segs.append([max(0.0, s - pad), prev + pad]) + s = t + prev = t + segs.append([max(0.0, s - pad), prev + pad]) + return segs + + +class VideoSensitiveDetect: + """ + 抽帧 + QwenVL HTTP 敏感检测(对齐 qwen_vl_server.py): + + 服务端: + POST {service_url}/infer + JSON: {image_path, task="sensitive", max_new_tokens, language, style} + 返回: {is_sensitive,label,score,reason} + + params: + - service_url: 默认 http://127.0.0.1:18080 + - timeout_sec: 默认 180 + - sample_fps: 默认 1.0 + - threshold: 默认 0.5 + - merge_gap: 默认 1.5 + - pad_sec: 默认 0.5 + - max_new_tokens: 默认 8 + outputs: + - out_dir/sensitive_segments.json + """ + + def execute(self, sample: dict, params: dict = None): + params = params or {} + video_path = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + + out_dir = make_run_dir(export_path, "video_sensitive_detect") + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + frames_dir = ensure_dir(os.path.join(art_dir, "frames")) + logger = get_logger("VideoSensitiveDetect", log_dir) + + service_url = params.get("service_url", "http://127.0.0.1:18080") + timeout_sec = int(params.get("timeout_sec", 180)) + sample_fps = float(params.get("sample_fps", 1.0)) + threshold = float(params.get("threshold", 0.5)) + merge_gap = float(params.get("merge_gap", 1.5)) + pad_sec = float(params.get("pad_sec", 0.5)) + max_new_tokens = int(params.get("max_new_tokens", 8)) + + fps, W, H, total_frames = get_video_info(video_path) + step = max(1, int(round(float(fps) / max(sample_fps, 1e-6)))) + + logger.info( + f"Start sensitive detect. video={video_path}, fps={fps}, step={step}, " + f"url={service_url}, thr={threshold}, gap={merge_gap}" + ) + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open video: {video_path}") + + hits = [] + sensitive_times = [] + + frame_id = 0 + while True: + ok, frame = cap.read() + if not ok: + break + + if frame_id % step != 0: + frame_id += 1 + continue + + t = frame_id / float(fps) if fps else 0.0 + frame_jpg = os.path.join(frames_dir, f"{frame_id:06d}.jpg") + save_frame_to_jpg(frame, frame_jpg) + + try: + res = qwenvl_infer_by_image_path( + image_path=frame_jpg, + task="sensitive", + service_url=service_url, + max_new_tokens=max_new_tokens, + timeout=timeout_sec, + ) + except Exception as e: + logger.error(f"infer failed at t={t:.2f}s frame={frame_id}: {repr(e)}") + frame_id += 1 + continue + + is_sensitive = bool(res.get("is_sensitive", False)) + score = float(res.get("score", 0.0)) + label = str(res.get("label", "none")) + reason = str(res.get("reason", "")) + + hits.append( + { + "time": t, + "frame_idx": frame_id, + "image_path": frame_jpg, + "is_sensitive": is_sensitive, + "label": label, + "score": score, + "reason": reason, + } + ) + + if is_sensitive and score >= threshold: + sensitive_times.append(t) + + frame_id += 1 + + cap.release() + + segs = _merge_times_to_segments(sensitive_times, gap=merge_gap, pad=pad_sec) + + result = { + "out_dir": out_dir, + "video": video_path, + "service_url": service_url, + "sample_fps": sample_fps, + "threshold": threshold, + "merge_gap": merge_gap, + "pad_sec": pad_sec, + "hits": hits, + "segments": [{"start": float(s), "end": float(e)} for s, e in segs], + } + + json_path = os.path.join(out_dir, "sensitive_segments.json") + with open(json_path, "w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. segments_json={json_path}, segments={len(segs)}, hits={len(hits)}") + return {"out_dir": out_dir, "segments_json": json_path, "segments_count": len(segs)} \ No newline at end of file diff --git a/runtime/ops/mapper/video_speech_asr/__init__.py b/runtime/ops/mapper/video_speech_asr/__init__.py new file mode 100644 index 00000000..cc00c40d --- /dev/null +++ b/runtime/ops/mapper/video_speech_asr/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoSpeechASR", + module_path="ops.mapper.video_speech_asr.process", +) diff --git a/runtime/ops/mapper/video_speech_asr/metadata.yml b/runtime/ops/mapper/video_speech_asr/metadata.yml new file mode 100644 index 00000000..0847fa47 --- /dev/null +++ b/runtime/ops/mapper/video_speech_asr/metadata.yml @@ -0,0 +1,16 @@ +name: '语音识别ASR' +name_en: 'Video Speech ASR' +description: '从视频抽取音频并进行语音识别,输出 asr.json(可含时间戳);支持指定语言/模型规模等参数。' +description_en: 'Extract audio and run ASR, outputs asr.json (with timestamps); supports language/model options.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoSpeechASR' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'text' \ No newline at end of file diff --git a/runtime/ops/mapper/video_speech_asr/process.py b/runtime/ops/mapper/video_speech_asr/process.py new file mode 100644 index 00000000..71169899 --- /dev/null +++ b/runtime/ops/mapper/video_speech_asr/process.py @@ -0,0 +1,213 @@ +# -*- coding: utf-8 -*- +import os +import json +import shutil +import subprocess +import re + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger + + +def _write_srt(segments, srt_path): + def _fmt(t): + h = int(t // 3600) + m = int((t % 3600) // 60) + s = int(t % 60) + ms = int(round((t - int(t)) * 1000)) + return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" + + with open(srt_path, "w", encoding="utf-8") as f: + for i, seg in enumerate(segments, 1): + f.write(str(i) + "\n") + f.write(f"{_fmt(seg['start'])} --> {_fmt(seg['end'])}\n") + f.write((seg.get("text") or "").strip() + "\n\n") + + +def _contains_cjk(s: str) -> bool: + return bool(re.search(r"[\u4e00-\u9fff]", s or "")) + + +def _to_simplified(text: str) -> str: + try: + from opencc import OpenCC + return OpenCC("t2s").convert(text) + except Exception: + return text + + +class VideoSpeechASR: + """语音转文字(优先 faster-whisper;失败自动回退 openai-whisper) + + params: + - ffmpeg_path: str, optional + - model: tiny|base|small|medium|large-v3, default small + - language: auto|zh|en, default zh + - beam_size: int, default 5 + - vad_filter: bool, default True + - compute_type: int8|int8_float16|float16|float32, default int8 + - sample_rate: int, default 16000 + - channels: int, default 1 + - max_audio_sec: float, optional + - zh_script: simplified|traditional|keep, default simplified + + # 离线/本地模型(faster-whisper) + - fw_model_path: str, optional # 本地模型路径(目录) + - fw_download_root: str, optional + - local_files_only: bool, default False + + outputs: + - artifacts/audio.wav + - artifacts/asr.json / asr.txt / asr.srt + - artifacts/asr_backend.json(记录用了哪个后端/异常信息) + """ + + @staticmethod + def execute(sample, params): + video_path = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + + op_name = "video_speech_asr" + out_dir = make_run_dir(export_path, op_name) + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + + logger = get_logger(op_name, log_dir) + logger.info(f"video={video_path}") + logger.info(f"out_dir={out_dir}") + + ffmpeg_path = params.get("ffmpeg_path") or shutil.which("ffmpeg") + if not ffmpeg_path: + raise RuntimeError("ffmpeg not found. Please install ffmpeg or pass params.ffmpeg_path") + + sr = int(params.get("sample_rate", 16000)) + ch = int(params.get("channels", 1)) + max_audio_sec = params.get("max_audio_sec", None) + max_audio_sec = float(max_audio_sec) if max_audio_sec is not None else None + + audio_path = os.path.join(art_dir, "audio.wav") + cmd = [ + ffmpeg_path, "-hide_banner", "-y", + "-i", video_path, + "-vn", + "-ac", str(ch), + "-ar", str(sr), + "-c:a", "pcm_s16le", + ] + if max_audio_sec is not None and max_audio_sec > 0: + cmd += ["-t", f"{max_audio_sec}"] + cmd += [audio_path] + + logger.info("FFmpeg cmd: " + " ".join(cmd)) + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p.returncode != 0: + raise RuntimeError(f"FFmpeg failed (code={p.returncode}).\nSTDERR:\n{p.stderr}") + + model_name = (params.get("model", "small") or "small") + language = (params.get("language", "zh") or "zh").lower() + beam_size = int(params.get("beam_size", 5)) + vad_filter = bool(params.get("vad_filter", True)) + compute_type = (params.get("compute_type", "int8") or "int8") + zh_script = (params.get("zh_script", "simplified") or "simplified").lower() + + fw_model_path = params.get("fw_model_path", None) + fw_download_root = params.get("fw_download_root", None) + local_files_only = bool(params.get("local_files_only", False)) + + segments = [] + full_text = "" + backend_info = {"backend": None, "error": None} + + # ===== try faster-whisper ===== + try: + from faster_whisper import WhisperModel + backend_info["backend"] = "faster-whisper" + + # 离线策略:local_files_only 时,把 HF 的联网行为尽量关掉 + if local_files_only: + os.environ.setdefault("HF_HUB_OFFLINE", "1") + os.environ.setdefault("TRANSFORMERS_OFFLINE", "1") + + model_id = fw_model_path or model_name + logger.info(f"[ASR] faster-whisper load model={model_id}, compute_type={compute_type}, offline={local_files_only}") + + fw = WhisperModel( + model_id, + device="cpu", + compute_type=compute_type, + download_root=fw_download_root, + ) + + logger.info("[ASR] faster-whisper transcribe start...") + seg_iter, info = fw.transcribe( + audio_path, + language=None if language == "auto" else language, + beam_size=beam_size, + vad_filter=vad_filter, + ) + for s in seg_iter: + segments.append({"start": float(s.start), "end": float(s.end), "text": (s.text or "").strip()}) + full_text = " ".join([s["text"] for s in segments]).strip() + logger.info("[ASR] faster-whisper transcribe done.") + + except Exception as e: + # ===== fallback openai-whisper ===== + backend_info["backend"] = "openai-whisper" + backend_info["error"] = f"faster-whisper failed: {repr(e)}" + logger.warning("[ASR] faster-whisper failed, fallback openai-whisper. reason=" + repr(e)) + + try: + import whisper + except Exception as e2: + raise RuntimeError("ASR backend failed. Please install: pip install faster-whisper openai-whisper") from e2 + + logger.info(f"[ASR] openai-whisper load model={model_name} (slow on CPU)") + wmodel = whisper.load_model(model_name) + + wargs = {"fp16": False, "verbose": False} + if language != "auto": + wargs["language"] = language + + logger.info("[ASR] openai-whisper transcribe start...") + result = wmodel.transcribe(audio_path, **wargs) + logger.info("[ASR] openai-whisper transcribe done.") + + for seg in result.get("segments", []): + segments.append({ + "start": float(seg.get("start", 0.0)), + "end": float(seg.get("end", 0.0)), + "text": (seg.get("text") or "").strip() + }) + full_text = (result.get("text") or "").strip() + + # 简体化 + if zh_script == "simplified": + if _contains_cjk(full_text): + full_text = _to_simplified(full_text) + for s in segments: + if _contains_cjk(s["text"]): + s["text"] = _to_simplified(s["text"]) + + json_path = os.path.join(art_dir, "asr.json") + txt_path = os.path.join(art_dir, "asr.txt") + srt_path = os.path.join(art_dir, "asr.srt") + backend_path = os.path.join(art_dir, "asr_backend.json") + + with open(json_path, "w", encoding="utf-8") as f: + json.dump({"text": full_text, "segments": segments}, f, ensure_ascii=False, indent=2) + with open(txt_path, "w", encoding="utf-8") as f: + f.write(full_text + "\n") + _write_srt(segments, srt_path) + + with open(backend_path, "w", encoding="utf-8") as f: + json.dump(backend_info, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. segments={len(segments)} asr_json={json_path}") + return { + "out_dir": out_dir, + "audio_wav": audio_path, + "asr_json": json_path, + "asr_txt": txt_path, + "asr_srt": srt_path, + "asr_backend": backend_path, + } \ No newline at end of file diff --git a/runtime/ops/mapper/video_subject_crop/__init__.py b/runtime/ops/mapper/video_subject_crop/__init__.py new file mode 100644 index 00000000..b4bc44bd --- /dev/null +++ b/runtime/ops/mapper/video_subject_crop/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoSubjectCrop", + module_path="ops.mapper.video_subject_crop.process", +) diff --git a/runtime/ops/mapper/video_subject_crop/metadata.yml b/runtime/ops/mapper/video_subject_crop/metadata.yml new file mode 100644 index 00000000..45107f9c --- /dev/null +++ b/runtime/ops/mapper/video_subject_crop/metadata.yml @@ -0,0 +1,16 @@ +name: '主体跟踪裁剪' +name_en: 'Video Subject Crop' +description: '根据 tracks.json 选择 Top1 主体轨迹并裁剪输出 subject.mp4,用于单主体验收链路。' +description_en: 'Select the top subject track from tracks.json and crop to output subject.mp4.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoSubjectCrop' +version: '1.0.0' +types: + - 'cleaning' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'video' \ No newline at end of file diff --git a/runtime/ops/mapper/video_subject_crop/process.py b/runtime/ops/mapper/video_subject_crop/process.py new file mode 100644 index 00000000..fb870c15 --- /dev/null +++ b/runtime/ops/mapper/video_subject_crop/process.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +import os +import json +import cv2 + +from .._video_common.paths import make_run_dir +from .._video_common.log import get_logger +from ..video_mot_track.process import VideoMotTrack + +def _bbox_area(b): + x1, y1, x2, y2 = b + return max(0.0, x2 - x1) * max(0.0, y2 - y1) + +def _select_top1_track(tracks: dict, min_frames: int = 10): + stats = {} # tid -> {"count":int, "area_sum":float} + for fr in tracks.get("frames", []): + for obj in fr.get("objects", []): + tid = int(obj["track_id"]) + area = _bbox_area(obj["bbox"]) + if tid not in stats: + stats[tid] = {"count": 0, "area_sum": 0.0} + stats[tid]["count"] += 1 + stats[tid]["area_sum"] += area + + items = [] + for tid, s in stats.items(): + if s["count"] < min_frames: + continue + avg_area = s["area_sum"] / max(1, s["count"]) + items.append((tid, s["count"], avg_area)) + + if not items: + return None + + items.sort(key=lambda x: (x[1], x[2]), reverse=True) + return int(items[0][0]) + +def _clamp(val, lo, hi): + return max(lo, min(hi, val)) + +def _ema(prev_bbox, bbox, alpha=0.8): + if prev_bbox is None: + return bbox + return [ + alpha*prev_bbox[0] + (1-alpha)*bbox[0], + alpha*prev_bbox[1] + (1-alpha)*bbox[1], + alpha*prev_bbox[2] + (1-alpha)*bbox[2], + alpha*prev_bbox[3] + (1-alpha)*bbox[3], + ] + +def _expand_bbox(bbox, margin, W, H): + x1, y1, x2, y2 = bbox + w = x2 - x1 + h = y2 - y1 + x1 = x1 - w * margin + y1 = y1 - h * margin + x2 = x2 + w * margin + y2 = y2 + h * margin + x1 = _clamp(int(x1), 0, W-1) + y1 = _clamp(int(y1), 0, H-1) + x2 = _clamp(int(x2), 0, W-1) + y2 = _clamp(int(y2), 0, H-1) + if x2 <= x1: x2 = min(W-1, x1+1) + if y2 <= y1: y2 = min(H-1, y1+1) + return [x1, y1, x2, y2] + +class VideoSubjectCrop: + """ + 主体追踪裁剪(Top1): + 输入: + - sample["filePath"] + - sample["export_path"] + - params["tracks_json"] (可选:不提供就自动找同一次 run 的 tracks.json) + 输出: + - subjects/subject.mp4 + - subjects/subject_track_id.txt + """ + def execute(self, sample: dict, params: dict = None): + params = params or {} + video_path = sample["filePath"] + export_path = sample["export_path"] + + out_dir = make_run_dir(export_path, "video_subject_crop") + logger = get_logger("VideoSubjectCrop", log_dir=out_dir) + + tracks_json = params.get("tracks_json", None) + if (not tracks_json) or (not os.path.exists(tracks_json)): + # 自动跑 MOT 生成 tracks.json + mot_params = params.get("mot_params", {}) # 可选:把 mot 的参数也透传进来 + logger.info("tracks_json not provided; run VideoMotTrack first to generate tracks.json") + mot_out = VideoMotTrack().execute(sample, mot_params) + tracks_json = mot_out["tracks_json"] + + crop_size = int(params.get("crop_size", 512)) + margin = float(params.get("margin", 0.15)) + smooth_alpha = float(params.get("smooth_alpha", 0.8)) + min_frames = int(params.get("min_frames", 10)) + fill_missing = bool(params.get("fill_missing", False)) + + with open(tracks_json, "r", encoding="utf-8") as f: + tracks = json.load(f) + + fps = float(tracks["fps"]) + W = int(tracks["width"]) + H = int(tracks["height"]) + + subject_id = _select_top1_track(tracks, min_frames=min_frames) + if subject_id is None: + raise RuntimeError(f"No valid subject track found (min_frames={min_frames}).") + + subjects_dir = os.path.join(out_dir, "subjects") + os.makedirs(subjects_dir, exist_ok=True) + + with open(os.path.join(subjects_dir, "subject_track_id.txt"), "w", encoding="utf-8") as f: + f.write(str(subject_id)) + + out_video = os.path.join(subjects_dir, "subject.mp4") + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open video: {video_path}") + + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + writer = cv2.VideoWriter(out_video, fourcc, fps, (crop_size, crop_size)) + + last_bbox = None + frame_id = 0 + + logger.info(f"Start subject crop. subject_id={subject_id}, tracks={tracks_json}") + + while True: + ret, frame = cap.read() + if not ret: + break + + bbox = None + if frame_id < len(tracks.get("frames", [])): + objs = tracks["frames"][frame_id].get("objects", []) + for obj in objs: + if int(obj["track_id"]) == int(subject_id): + bbox = obj["bbox"] + break + + if bbox is None: + if fill_missing and last_bbox is not None: + bbox_s = last_bbox + else: + frame_id += 1 + continue + else: + bbox_s = _ema(last_bbox, bbox, alpha=smooth_alpha) + last_bbox = bbox_s + + bbox_e = _expand_bbox(bbox_s, margin=margin, W=W, H=H) + x1, y1, x2, y2 = bbox_e + crop = frame[y1:y2, x1:x2] + if crop.size == 0: + frame_id += 1 + continue + + crop = cv2.resize(crop, (crop_size, crop_size), interpolation=cv2.INTER_LINEAR) + writer.write(crop) + + frame_id += 1 + + cap.release() + writer.release() + + logger.info(f"Done. subject_video={out_video}") + + return { + "out_dir": out_dir, + "subject_track_id": subject_id, + "subject_video": out_video, + } \ No newline at end of file diff --git a/runtime/ops/mapper/video_subtitle_ocr/__init__.py b/runtime/ops/mapper/video_subtitle_ocr/__init__.py new file mode 100644 index 00000000..5460c2f8 --- /dev/null +++ b/runtime/ops/mapper/video_subtitle_ocr/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoSubtitleOCR", + module_path="ops.mapper.video_subtitle_ocr.process", +) diff --git a/runtime/ops/mapper/video_subtitle_ocr/metadata.yml b/runtime/ops/mapper/video_subtitle_ocr/metadata.yml new file mode 100644 index 00000000..98948bdd --- /dev/null +++ b/runtime/ops/mapper/video_subtitle_ocr/metadata.yml @@ -0,0 +1,16 @@ +name: '视频字幕OCR提取' +name_en: 'Video Subtitle OCR' +description: '对视频底部字幕区域进行OCR识别,输出 subtitles.json 与 subtitles.srt;可选自动去黑边、抽帧、跳过相似帧、字幕去重合并、英文空格修复。' +description_en: 'OCR for bottom subtitles, outputs subtitles.json and subtitles.srt; optional deborder, sampling, frame skipping, merge, English spacing fix.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoSubtitleOCR' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'text' \ No newline at end of file diff --git a/runtime/ops/mapper/video_subtitle_ocr/process.py b/runtime/ops/mapper/video_subtitle_ocr/process.py new file mode 100644 index 00000000..2c58753f --- /dev/null +++ b/runtime/ops/mapper/video_subtitle_ocr/process.py @@ -0,0 +1,406 @@ +# -*- coding: utf-8 -*- +import os +import json +import re +import shutil +import subprocess +import cv2 +import numpy as np +import inspect + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger +from .._video_common.io_video import get_video_info +from paddleocr import PaddleOCR +from .._video_common.model_paths import resolve_model_path + +def build_paddle_ocr(params, ocr_lang: str, use_angle_cls: bool): + """ + 默认模型目录: + /mnt/models/ocr/det + /mnt/models/ocr/rec + /mnt/models/ocr/cls + 也支持 params['ocr_model_dir'] 指定(相对/绝对)。 + """ + ocr_root = resolve_model_path(params, "ocr_model_dir", "ocr") + det_dir = os.path.join(ocr_root, "det") + rec_dir = os.path.join(ocr_root, "rec") + cls_dir = os.path.join(ocr_root, "cls") + + # 目录不存在就直接报错,让用户去模型仓下载到固定位置 + for p in [det_dir, rec_dir] + ([cls_dir] if use_angle_cls else []): + if not os.path.exists(p): + raise RuntimeError(f"PaddleOCR model dir not found: {p}. Please download OCR models into model repo path.") + + sig = inspect.signature(PaddleOCR.__init__) + kw = {"lang": ocr_lang} + if "use_angle_cls" in sig.parameters: + kw["use_angle_cls"] = use_angle_cls + # PaddleOCR 3.4.0 支持这些 + if "det_model_dir" in sig.parameters: + kw["det_model_dir"] = det_dir + if "rec_model_dir" in sig.parameters: + kw["rec_model_dir"] = rec_dir + if "cls_model_dir" in sig.parameters and use_angle_cls: + kw["cls_model_dir"] = cls_dir + + return PaddleOCR(**kw) +def _write_srt(segments, srt_path): + def _fmt(t): + h = int(t // 3600) + m = int((t % 3600) // 60) + s = int(t % 60) + ms = int(round((t - int(t)) * 1000)) + return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" + + with open(srt_path, "w", encoding="utf-8") as f: + for i, seg in enumerate(segments, 1): + f.write(str(i) + "\n") + f.write(f"{_fmt(seg['start'])} --> {_fmt(seg['end'])}\n") + f.write((seg.get("text") or "").strip() + "\n\n") + + +def _clean_text(t: str) -> str: + if not t: + return "" + t = t.strip() + t = re.sub(r"\s+", " ", t) + return t + + +def _english_ratio(text: str) -> float: + if not text: + return 0.0 + letters = sum(c.isalpha() for c in text) + return letters / max(1, len(text)) + + +def _fix_english_spacing(text: str) -> str: + """英文字幕空格修复(轻量规则,避免影响中文)""" + if not text: + return text + if _english_ratio(text) < 0.40: + return text + + t = text + + # 小写后接大写:ThisIs -> This Is + t = re.sub(r"([a-z])([A-Z])", r"\1 \2", t) + + # 字母数字边界:A1 / 1A + t = re.sub(r"([A-Za-z])(\d)", r"\1 \2", t) + t = re.sub(r"(\d)([A-Za-z])", r"\1 \2", t) + + # 标点前去空格,标点后若紧跟字母则补空格(保守) + t = re.sub(r"\s+([,.;:?!])", r"\1", t) + t = re.sub(r"([,.;:?!])([A-Za-z])", r"\1 \2", t) + + # 多空格压缩 + t = re.sub(r"\s+", " ", t).strip() + return t + + +def _norm_sub_key(text: str) -> str: + """用于合并的规范化 key:空格归一、末尾标点归一、英文小写化""" + if not text: + return "" + t = text.strip() + t = re.sub(r"\s+", " ", t) + # 去掉末尾重复标点(中英文都考虑) + t = re.sub(r"[.。!?!?]+$", "", t).strip() + + # 英文占比高则统一小写,便于合并 + if _english_ratio(t) > 0.40: + t = t.lower() + + return t + + +def _roi_changed(cur_roi, last_roi, diff_thr=4.0): + """diff_thr 调低一点更敏感,避免跳过字幕变化""" + if last_roi is None: + return True + a = cv2.cvtColor(cur_roi, cv2.COLOR_BGR2GRAY) + b = cv2.cvtColor(last_roi, cv2.COLOR_BGR2GRAY) + if a.shape != b.shape: + b = cv2.resize(b, (a.shape[1], a.shape[0]), interpolation=cv2.INTER_AREA) + diff = np.mean(np.abs(a.astype(np.float32) - b.astype(np.float32))) + return diff >= diff_thr + + +def _even(x: int) -> int: + return x - (x % 2) + + +def _parse_cropdetect(stderr: str): + m_last = None + for line in stderr.splitlines(): + m = re.search(r"crop=(\d+):(\d+):(\d+):(\d+)", line) + if m: + m_last = m + if not m_last: + return None + w, h, x, y = map(int, m_last.groups()) + return (_even(w), _even(h), _even(x), _even(y)) + + +def _deborder_ffmpeg(ffmpeg_path: str, in_video: str, out_video: str, logger): + cmd1 = [ + ffmpeg_path, "-hide_banner", "-y", + "-ss", "0", "-i", in_video, "-t", "2", + "-vf", "cropdetect=24:16:0", + "-f", "null", "-" + ] + logger.info("cropdetect cmd: " + " ".join(cmd1)) + p1 = subprocess.run(cmd1, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + crop = _parse_cropdetect(p1.stderr) + if not crop: + logger.warning("cropdetect found nothing, keep original (copy).") + cmdc = [ffmpeg_path, "-hide_banner", "-y", "-i", in_video, "-c", "copy", out_video] + p = subprocess.run(cmdc, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p.returncode != 0: + raise RuntimeError(f"ffmpeg copy failed.\n{p.stderr}") + return None + + w, h, x, y = crop + cmd2 = [ + ffmpeg_path, "-hide_banner", "-y", + "-i", in_video, + "-vf", f"crop={w}:{h}:{x}:{y}", + "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-pix_fmt", "yuv420p", + "-c:a", "copy", + out_video + ] + logger.info("crop cmd: " + " ".join(cmd2)) + p2 = subprocess.run(cmd2, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p2.returncode != 0: + raise RuntimeError(f"ffmpeg crop failed.\n{p2.stderr}") + return {"w": w, "h": h, "x": x, "y": y} + + +def _extract_texts_from_any(res): + """ + 兼容 PaddleOCR 多种返回: + - 传统:res = [ [ [box,(text,score)], ... ] ] + - 新 pipeline/dict:res 可能是 dict/对象,里头有 'rec_texts'/'rec_scores' 或 'texts'/'scores' + 返回: list[(text,score)] + """ + out = [] + + # dict 风格 + if isinstance(res, dict): + keys_text = ["rec_texts", "texts", "text"] + keys_score = ["rec_scores", "scores", "score"] + texts = None + scores = None + for kt in keys_text: + if kt in res: + texts = res[kt] + break + for ks in keys_score: + if ks in res: + scores = res[ks] + break + + if texts is not None: + if isinstance(texts, str): + out.append((texts, float(scores) if scores is not None else 0.0)) + return out + if isinstance(texts, (list, tuple)): + if scores is None: + for t in texts: + out.append((str(t), 0.0)) + else: + if isinstance(scores, (list, tuple)) and len(scores) == len(texts): + for t, s in zip(texts, scores): + out.append((str(t), float(s))) + else: + for t in texts: + out.append((str(t), float(scores) if scores is not None else 0.0)) + return out + + if "result" in res: + return _extract_texts_from_any(res["result"]) + + # list 风格(传统) + if isinstance(res, list): + if len(res) == 0: + return out + + if isinstance(res[0], dict): + for item in res: + out.extend(_extract_texts_from_any(item)) + return out + + lines = res[0] if isinstance(res[0], list) else res + for line in lines: + try: + if isinstance(line, (list, tuple)) and len(line) >= 2: + info = line[1] + if isinstance(info, (list, tuple)) and len(info) >= 2: + out.append((str(info[0]), float(info[1]))) + elif isinstance(info, str): + out.append((info, 0.0)) + except Exception: + continue + return out + + # 兜底 + try: + s = str(res) + if s: + out.append((s, 0.0)) + except Exception: + pass + return out + + +class VideoSubtitleOCR: + """字幕 OCR(自动去黑边 + 固定下30% + 英文空格修复 + 去重合并) + + params: + - preprocess_deborder: bool, default True + - sample_fps: float, default 1.0 + - max_frames: int, default 240 + - subtitle_ratio: float, default 0.30 + - ocr_lang: ch|en, default ch + - min_score: float, default 0.0 + - roi_diff_thr: float, default 4.0 + - gap_merge_sec: float, default 1.2 # ✅ 更容易合并跨帧字幕 + - fix_english_space: bool, default True # ✅ 英文空格修复开关 + + outputs: + - artifacts/subtitles.json + - artifacts/subtitles.srt + - artifacts/frames/subtitle_*.jpg + - artifacts/deborder.mp4 (if preprocess_deborder=True) + """ + + @staticmethod + def execute(sample, params): + os.environ.setdefault("PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK", "True") + + in_video = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + op_name = "video_subtitle_ocr" + out_dir = make_run_dir(export_path, op_name) + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + frames_dir = ensure_dir(os.path.join(art_dir, "frames")) + logger = get_logger(op_name, log_dir) + + ffmpeg_path = params.get("ffmpeg_path") or shutil.which("ffmpeg") + if not ffmpeg_path: + raise RuntimeError("ffmpeg not found") + + # ✅ 默认自动去黑边 + if params.get("preprocess_deborder", True): + deborder_mp4 = os.path.join(art_dir, "deborder.mp4") + crop = _deborder_ffmpeg(ffmpeg_path, in_video, deborder_mp4, logger) + with open(os.path.join(art_dir, "deborder_crop.json"), "w", encoding="utf-8") as f: + json.dump({"crop": crop, "deborder_mp4": deborder_mp4}, f, ensure_ascii=False, indent=2) + src_video = deborder_mp4 + else: + src_video = in_video + + logger.info(f"video={src_video}") + logger.info(f"out_dir={out_dir}") + + from paddleocr import PaddleOCR + ocr_lang = params.get("ocr_lang", "ch") + ocr = build_paddle_ocr(params, ocr_lang=ocr_lang, use_angle_cls=False) + + fps, w, h, total = get_video_info(src_video) + sample_fps = float(params.get("sample_fps", 1.0)) + max_frames = int(params.get("max_frames", 240)) + subtitle_ratio = float(params.get("subtitle_ratio", 0.30)) + min_score = float(params.get("min_score", 0.0)) + roi_diff_thr = float(params.get("roi_diff_thr", 4.0)) + gap_merge = float(params.get("gap_merge_sec", 1.2)) + fix_en_space = bool(params.get("fix_english_space", True)) + + step = max(1, int(round(fps / max(sample_fps, 0.0001)))) + idxs = list(range(0, total, step)) + if max_frames and len(idxs) > max_frames: + n = max_frames + idxs = [idxs[int(i * (len(idxs) - 1) / max(1, n - 1))] for i in range(n)] + + cap = cv2.VideoCapture(src_video) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open video: {src_video}") + + raw_hits = [] + last_roi = None + + for k, fi in enumerate(idxs): + cap.set(cv2.CAP_PROP_POS_FRAMES, int(fi)) + ok, frame = cap.read() + if not ok or frame is None: + continue + + t = float(fi / fps) if fps else 0.0 + y0 = int(h * (1.0 - subtitle_ratio)) + roi = frame[y0:h, 0:w] + + if not _roi_changed(roi, last_roi, diff_thr=roi_diff_thr): + continue + last_roi = roi + + jpg_path = os.path.join(frames_dir, f"subtitle_{int(fi):06d}.jpg") + cv2.imwrite(jpg_path, roi) + + res = ocr.ocr(roi) + pairs = _extract_texts_from_any(res) + texts = [txt for (txt, sc) in pairs if txt and float(sc) >= min_score] + + text = _clean_text(" ".join(texts)) + if fix_en_space: + text = _fix_english_spacing(text) + + if text: + raw_hits.append({"t": t, "text": text, "key": _norm_sub_key(text), "frame_id": int(fi), "jpg": jpg_path}) + + if (k + 1) % 20 == 0 or k == len(idxs) - 1: + logger.info(f"[{k+1}/{len(idxs)}] frame={fi} hit={1 if text else 0} len={len(text)}") + + cap.release() + + # ✅ 合并相邻相同字幕(按规范化 key 合并) + segments = [] + for hit in raw_hits: + if not segments: + segments.append({ + "start": hit["t"], + "end": hit["t"], + "text": hit["text"], + "key": hit["key"], + "evidence": [{"t": hit["t"], "frame_id": hit["frame_id"], "jpg": hit["jpg"]}], + }) + continue + + last = segments[-1] + if hit["key"] == last["key"] and (hit["t"] - last["end"] <= gap_merge): + last["end"] = hit["t"] + last["evidence"].append({"t": hit["t"], "frame_id": hit["frame_id"], "jpg": hit["jpg"]}) + else: + segments.append({ + "start": hit["t"], + "end": hit["t"], + "text": hit["text"], + "key": hit["key"], + "evidence": [{"t": hit["t"], "frame_id": hit["frame_id"], "jpg": hit["jpg"]}], + }) + + # end 往后延一点,srt 更自然 + for seg in segments: + seg["end"] = float(seg["end"] + max(0.4, 1.0 / max(sample_fps, 0.1))) + + # 输出时不需要 key(但保留也无所谓;你想更干净就删掉) + json_path = os.path.join(art_dir, "subtitles.json") + srt_path = os.path.join(art_dir, "subtitles.srt") + with open(json_path, "w", encoding="utf-8") as f: + json.dump({"segments": segments}, f, ensure_ascii=False, indent=2) + _write_srt(segments, srt_path) + + logger.info(f"Done. subtitles={len(segments)} srt={srt_path}") + return {"out_dir": out_dir, "subtitles_json": json_path, "subtitles_srt": srt_path, "count": len(segments)} \ No newline at end of file diff --git a/runtime/ops/mapper/video_summary_qwenvl/__init__.py b/runtime/ops/mapper/video_summary_qwenvl/__init__.py new file mode 100644 index 00000000..6e9386f9 --- /dev/null +++ b/runtime/ops/mapper/video_summary_qwenvl/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoSummaryQwenVL", + module_path="ops.mapper.video_summary_qwenvl.process", +) diff --git a/runtime/ops/mapper/video_summary_qwenvl/metadata.yml b/runtime/ops/mapper/video_summary_qwenvl/metadata.yml new file mode 100644 index 00000000..5f34dd41 --- /dev/null +++ b/runtime/ops/mapper/video_summary_qwenvl/metadata.yml @@ -0,0 +1,16 @@ +name: '视频摘要(QwenVL)' +name_en: 'Video Summary (QwenVL)' +description: '抽多帧拼 montage,只调用一次 QwenVL summary,输出 summary.json(含 montage.jpg 与证据帧)。' +description_en: 'Build montage from sampled frames, call QwenVL summary once; outputs summary.json with montage and evidence.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoSummaryQwenVL' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'text' \ No newline at end of file diff --git a/runtime/ops/mapper/video_summary_qwenvl/process.py b/runtime/ops/mapper/video_summary_qwenvl/process.py new file mode 100644 index 00000000..19efc107 --- /dev/null +++ b/runtime/ops/mapper/video_summary_qwenvl/process.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +import os +import json +import math +import cv2 + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger +from .._video_common.io_video import get_video_info +from .._video_common.qwen_http_client import qwenvl_infer_by_image_path, save_frame_to_jpg + + +def _sample_frame_indices(total_frames: int, fps: float, sample_fps: float, max_frames: int): + if total_frames <= 0: + return [] + fps = float(fps) if fps else 25.0 + step = max(1, int(round(fps / max(float(sample_fps), 1e-6)))) + idxs = list(range(0, total_frames, step)) + if max_frames and len(idxs) > int(max_frames): + n = int(max_frames) + idxs = [idxs[int(i * (len(idxs) - 1) / max(1, n - 1))] for i in range(n)] + return idxs + + +def _make_montage(frames, cell_w=384, cell_h=216, max_cols=4): + n = len(frames) + cols = min(max_cols, n) + rows = int(math.ceil(n / cols)) + canvas = 255 * (cv2.cvtColor(cv2.UMat(cell_h * rows, cell_w * cols, cv2.CV_8UC3), cv2.COLOR_BGR2RGB).get()) + canvas[:] = 255 + for i, img in enumerate(frames): + r = i // cols + c = i % cols + x0, y0 = c * cell_w, r * cell_h + resized = cv2.resize(img, (cell_w, cell_h)) + canvas[y0 : y0 + cell_h, x0 : x0 + cell_w] = resized + return canvas + + +class VideoSummaryQwenVL: + """ + 抽多帧拼 montage → QwenVL HTTP 生成摘要(对齐服务端 task=summary): + 返回: {summary} + + params: + - service_url: 默认 http://127.0.0.1:18080 + - timeout_sec: 默认 180 + - sample_fps: 默认 1.0 + - max_frames: 默认 12 + - language: 默认 zh + - style: 默认 normal + - max_new_tokens: 默认 160 + - montage_cell_w: 默认 384 + - montage_cell_h: 默认 216 + - montage_max_cols: 默认 4 + outputs: + - artifacts/montage.jpg + - artifacts/summary.json + - artifacts/frames/*.jpg + """ + + def execute(self, sample, params=None): + params = params or {} + video_path = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + + out_dir = make_run_dir(export_path, "video_summary_qwenvl") + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + frames_dir = ensure_dir(os.path.join(art_dir, "frames")) + logger = get_logger("VideoSummaryQwenVL", log_dir) + + service_url = params.get("service_url", "http://127.0.0.1:18080") + timeout_sec = int(params.get("timeout_sec", 180)) + + sample_fps = float(params.get("sample_fps", 1.0)) + max_frames = int(params.get("max_frames", 12)) + language = params.get("language", "zh") + style = params.get("style", "normal") + max_new_tokens = int(params.get("max_new_tokens", 160)) + + cell_w = int(params.get("montage_cell_w", 384)) + cell_h = int(params.get("montage_cell_h", 216)) + max_cols = int(params.get("montage_max_cols", 4)) + + fps, W, H, total_frames = get_video_info(video_path) + idxs = _sample_frame_indices(total_frames, fps, sample_fps, max_frames) + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open video: {video_path}") + + frames = [] + evidence = [] + + for idx in idxs: + cap.set(cv2.CAP_PROP_POS_FRAMES, idx) + ok, frame = cap.read() + if not ok: + continue + frame_jpg = os.path.join(frames_dir, f"{idx:06d}.jpg") + save_frame_to_jpg(frame, frame_jpg) + frames.append(frame) + evidence.append({"frame_idx": idx, "image_path": frame_jpg}) + + cap.release() + + montage_path = os.path.join(art_dir, "montage.jpg") + summary = "" + + if frames: + montage = _make_montage(frames, cell_w=cell_w, cell_h=cell_h, max_cols=max_cols) + cv2.imwrite(montage_path, montage) + + res = qwenvl_infer_by_image_path( + image_path=montage_path, + task="summary", + service_url=service_url, + max_new_tokens=max_new_tokens, + language=language, + style=style, + timeout=timeout_sec, + ) + summary = (res.get("summary") or "").strip() + + out_json = os.path.join(art_dir, "summary.json") + with open(out_json, "w", encoding="utf-8") as f: + json.dump( + { + "summary": summary, + "service_url": service_url, + "sample_fps": sample_fps, + "max_frames": max_frames, + "language": language, + "style": style, + "evidence": evidence, + "montage": montage_path, + }, + f, + ensure_ascii=False, + indent=2, + ) + + logger.info(f"Done. summary_json={out_json}") + return {"out_dir": out_dir, "summary_json": out_json, "montage_jpg": montage_path} \ No newline at end of file diff --git a/runtime/ops/mapper/video_text_ocr/__init__.py b/runtime/ops/mapper/video_text_ocr/__init__.py new file mode 100644 index 00000000..74283ffb --- /dev/null +++ b/runtime/ops/mapper/video_text_ocr/__init__.py @@ -0,0 +1,6 @@ +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="VideoTextOCR", + module_path="ops.mapper.video_text_ocr.process", +) diff --git a/runtime/ops/mapper/video_text_ocr/metadata.yml b/runtime/ops/mapper/video_text_ocr/metadata.yml new file mode 100644 index 00000000..5f911d60 --- /dev/null +++ b/runtime/ops/mapper/video_text_ocr/metadata.yml @@ -0,0 +1,16 @@ +name: '视频显著文字OCR提取' +name_en: 'Video Text OCR' +description: '对视频上方/主要区域显著文字进行OCR识别,输出 text_ocr.json;可选自动去黑边、抽帧、跳过相似帧。' +description_en: 'OCR for salient texts on main/top region, outputs text_ocr.json; optional deborder, sampling, frame skipping.' +language: 'python' +vendor: 'huawei' +raw_id: 'VideoTextOCR' +version: '1.0.0' +types: + - 'annotation' +modal: 'video' +effect: + before: '' + after: '' +inputs: 'video' +outputs: 'text' \ No newline at end of file diff --git a/runtime/ops/mapper/video_text_ocr/process.py b/runtime/ops/mapper/video_text_ocr/process.py new file mode 100644 index 00000000..082b2143 --- /dev/null +++ b/runtime/ops/mapper/video_text_ocr/process.py @@ -0,0 +1,288 @@ +# -*- coding: utf-8 -*- +import os +import json +import re +import shutil +import subprocess +import cv2 +import numpy as np +import inspect +from collections import Counter + +from .._video_common.paths import make_run_dir, ensure_dir +from .._video_common.log import get_logger +from .._video_common.io_video import get_video_info +from paddleocr import PaddleOCR +from .._video_common.model_paths import resolve_model_path + +def build_paddle_ocr(params, ocr_lang: str, use_angle_cls: bool): + """ + 默认模型目录: + /mnt/models/ocr/det + /mnt/models/ocr/rec + /mnt/models/ocr/cls + 也支持 params['ocr_model_dir'] 指定(相对/绝对)。 + """ + ocr_root = resolve_model_path(params, "ocr_model_dir", "ocr") + det_dir = os.path.join(ocr_root, "det") + rec_dir = os.path.join(ocr_root, "rec") + cls_dir = os.path.join(ocr_root, "cls") + + # 目录不存在就直接报错,让用户去模型仓下载到固定位置 + for p in [det_dir, rec_dir] + ([cls_dir] if use_angle_cls else []): + if not os.path.exists(p): + raise RuntimeError(f"PaddleOCR model dir not found: {p}. Please download OCR models into model repo path.") + + sig = inspect.signature(PaddleOCR.__init__) + kw = {"lang": ocr_lang} + if "use_angle_cls" in sig.parameters: + kw["use_angle_cls"] = use_angle_cls + # PaddleOCR 3.4.0 支持这些 + if "det_model_dir" in sig.parameters: + kw["det_model_dir"] = det_dir + if "rec_model_dir" in sig.parameters: + kw["rec_model_dir"] = rec_dir + if "cls_model_dir" in sig.parameters and use_angle_cls: + kw["cls_model_dir"] = cls_dir + + return PaddleOCR(**kw) + +def _clean_text(t: str) -> str: + if not t: + return "" + t = t.strip() + t = re.sub(r"\s+", " ", t) + return t + + +def _roi_changed(cur_roi, last_roi, diff_thr=6.0): + if last_roi is None: + return True + a = cv2.cvtColor(cur_roi, cv2.COLOR_BGR2GRAY) + b = cv2.cvtColor(last_roi, cv2.COLOR_BGR2GRAY) + if a.shape != b.shape: + b = cv2.resize(b, (a.shape[1], a.shape[0]), interpolation=cv2.INTER_AREA) + diff = np.mean(np.abs(a.astype(np.float32) - b.astype(np.float32))) + return diff >= diff_thr + + +def _even(x: int) -> int: + return x - (x % 2) + + +def _parse_cropdetect(stderr: str): + m_last = None + for line in stderr.splitlines(): + m = re.search(r"crop=(\d+):(\d+):(\d+):(\d+)", line) + if m: + m_last = m + if not m_last: + return None + w, h, x, y = map(int, m_last.groups()) + return (_even(w), _even(h), _even(x), _even(y)) + + +def _deborder_ffmpeg(ffmpeg_path: str, in_video: str, out_video: str, logger): + cmd1 = [ + ffmpeg_path, "-hide_banner", "-y", + "-ss", "0", "-i", in_video, "-t", "2", + "-vf", "cropdetect=24:16:0", + "-f", "null", "-" + ] + logger.info("cropdetect cmd: " + " ".join(cmd1)) + p1 = subprocess.run(cmd1, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + crop = _parse_cropdetect(p1.stderr) + if not crop: + logger.warning("cropdetect found nothing, keep original (copy).") + cmdc = [ffmpeg_path, "-hide_banner", "-y", "-i", in_video, "-c", "copy", out_video] + p = subprocess.run(cmdc, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p.returncode != 0: + raise RuntimeError(f"ffmpeg copy failed.\n{p.stderr}") + return None + + w, h, x, y = crop + cmd2 = [ + ffmpeg_path, "-hide_banner", "-y", + "-i", in_video, + "-vf", f"crop={w}:{h}:{x}:{y}", + "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-pix_fmt", "yuv420p", + "-c:a", "copy", + out_video + ] + logger.info("crop cmd: " + " ".join(cmd2)) + p2 = subprocess.run(cmd2, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if p2.returncode != 0: + raise RuntimeError(f"ffmpeg crop failed.\n{p2.stderr}") + return {"w": w, "h": h, "x": x, "y": y} + + +def _extract_texts_from_any(res): + out = [] + if isinstance(res, dict): + for kt in ["rec_texts", "texts", "text"]: + if kt in res: + texts = res[kt] + scores = res.get("rec_scores", res.get("scores", res.get("score", None))) + if isinstance(texts, str): + out.append((texts, float(scores) if scores is not None else 0.0)) + return out + if isinstance(texts, (list, tuple)): + if isinstance(scores, (list, tuple)) and len(scores) == len(texts): + for t, s in zip(texts, scores): + out.append((str(t), float(s))) + else: + for t in texts: + out.append((str(t), float(scores) if scores is not None else 0.0)) + return out + if "result" in res: + return _extract_texts_from_any(res["result"]) + + if isinstance(res, list): + if len(res) == 0: + return out + if isinstance(res[0], dict): + for item in res: + out.extend(_extract_texts_from_any(item)) + return out + lines = res[0] if isinstance(res[0], list) else res + for line in lines: + try: + if isinstance(line, (list, tuple)) and len(line) >= 2: + info = line[1] + if isinstance(info, (list, tuple)) and len(info) >= 2: + out.append((str(info[0]), float(info[1]))) + elif isinstance(info, str): + out.append((info, 0.0)) + except Exception: + continue + return out + + try: + s = str(res) + if s: + out.append((s, 0.0)) + except Exception: + pass + return out + + +def _is_garbage_text(t: str) -> bool: + if not t: + return True + s = t.replace(" ", "") + if len(s) < 2: + return True + letters = sum(c.isalpha() for c in s) + if letters / len(s) > 0.9: + uniq = len(set(s.lower())) + if uniq <= 5: + return True + cnt = Counter(s.lower()) + most = cnt.most_common(1)[0][1] + if most / len(s) > 0.65: + return True + return False + + +class VideoTextOCR: + """显著文字 OCR(自动去黑边 + 上70%) + + params: + - preprocess_deborder: bool, default True + - sample_fps: float, default 0.5 + - max_frames: int, default 120 + - top_ratio: float, default 0.70 + - ocr_lang: ch|en, default ch + - min_score: float, default 0.0 + - roi_diff_thr: float, default 6.0 + """ + + @staticmethod + def execute(sample, params): + os.environ.setdefault("PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK", "True") + + in_video = sample["filePath"] + export_path = sample.get("export_path", "./outputs") + op_name = "video_text_ocr" + out_dir = make_run_dir(export_path, op_name) + log_dir = ensure_dir(os.path.join(out_dir, "logs")) + art_dir = ensure_dir(os.path.join(out_dir, "artifacts")) + frames_dir = ensure_dir(os.path.join(art_dir, "frames")) + logger = get_logger(op_name, log_dir) + + ffmpeg_path = params.get("ffmpeg_path") or shutil.which("ffmpeg") + if not ffmpeg_path: + raise RuntimeError("ffmpeg not found") + + if params.get("preprocess_deborder", True): + deborder_mp4 = os.path.join(art_dir, "deborder.mp4") + crop = _deborder_ffmpeg(ffmpeg_path, in_video, deborder_mp4, logger) + with open(os.path.join(art_dir, "deborder_crop.json"), "w", encoding="utf-8") as f: + json.dump({"crop": crop, "deborder_mp4": deborder_mp4}, f, ensure_ascii=False, indent=2) + src_video = deborder_mp4 + else: + src_video = in_video + + logger.info(f"video={src_video}") + logger.info(f"out_dir={out_dir}") + + from paddleocr import PaddleOCR + ocr_lang = params.get("ocr_lang", "ch") + ocr = build_paddle_ocr(params, ocr_lang=ocr_lang, use_angle_cls=False) + + fps, w, h, total = get_video_info(src_video) + sample_fps = float(params.get("sample_fps", 0.5)) + max_frames = int(params.get("max_frames", 120)) + top_ratio = float(params.get("top_ratio", 0.70)) + min_score = float(params.get("min_score", 0.0)) + roi_diff_thr = float(params.get("roi_diff_thr", 6.0)) + + step = max(1, int(round(fps / max(sample_fps, 0.0001)))) + idxs = list(range(0, total, step)) + if max_frames and len(idxs) > max_frames: + n = max_frames + idxs = [idxs[int(i * (len(idxs) - 1) / max(1, n - 1))] for i in range(n)] + + cap = cv2.VideoCapture(src_video) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open video: {src_video}") + + hits = [] + last_roi = None + + for k, fi in enumerate(idxs): + cap.set(cv2.CAP_PROP_POS_FRAMES, int(fi)) + ok, frame = cap.read() + if not ok or frame is None: + continue + + t = float(fi / fps) if fps else 0.0 + y1 = int(h * top_ratio) + roi = frame[0:y1, 0:w] + + if not _roi_changed(roi, last_roi, diff_thr=roi_diff_thr): + continue + last_roi = roi + + jpg_path = os.path.join(frames_dir, f"text_{int(fi):06d}.jpg") + cv2.imwrite(jpg_path, roi) + + res = ocr.ocr(roi) + pairs = _extract_texts_from_any(res) + texts = [txt for (txt, sc) in pairs if txt and float(sc) >= min_score] + text = _clean_text(" ".join(texts)) + + if text and (not _is_garbage_text(text)): + hits.append({"t": t, "frame_id": int(fi), "text": text, "jpg": jpg_path}) + + if (k + 1) % 20 == 0 or k == len(idxs) - 1: + logger.info(f"[{k+1}/{len(idxs)}] frame={fi} hit={1 if text else 0} len={len(text)}") + + cap.release() + + json_path = os.path.join(art_dir, "text_ocr.json") + with open(json_path, "w", encoding="utf-8") as f: + json.dump({"hits": hits}, f, ensure_ascii=False, indent=2) + + logger.info(f"Done. hits={len(hits)}") + return {"out_dir": out_dir, "text_ocr_json": json_path, "count": len(hits)} \ No newline at end of file