Source code for kiui.video

import os
import subprocess
import json
import math
from typing import Tuple, Sequence

import cv2
import numpy as np
import torch
from rich.console import Console
from rich.table import Table

from kiui.typing import *


[docs] def read_video( path: str, mode: Literal["float", "uint8", "torch", "tensor"] = "float", order: Literal["RGB", "BGR"] = "RGB", ) -> Tuple[Union[ndarray, Tensor], float]: """Read a video file into a tensor / numpy array. Args: path: Path to the video file. mode: Returned data type. - ``"uint8"``: uint8 numpy array, [T, H, W, 3], range [0, 255] - ``"float"``: float32 numpy array, [T, H, W, 3], range [0, 1] - ``"torch"`` / ``"tensor"``: float32 torch tensor, [T, H, W, 3], range [0, 1] order: Channel order, ``"RGB"`` or ``"BGR"``. Returns: video: Video frames in the requested format. fps: Frames per second of the video. """ cap = cv2.VideoCapture(path) if not cap.isOpened(): raise FileNotFoundError(f"cannot open video: {path}") fps = cap.get(cv2.CAP_PROP_FPS) frames = [] while True: ok, frame = cap.read() if not ok: break # OpenCV reads in BGR if order == "RGB": frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frames.append(frame) cap.release() if len(frames) == 0: raise ValueError(f"no frames read from video: {path}") video = np.stack(frames, axis=0) # [T, H, W, 3], uint8 if mode == "uint8": return video, float(fps) elif mode == "float": return video.astype(np.float32) / 255.0, float(fps) elif mode in ["torch", "tensor"]: return torch.from_numpy(video.astype(np.float32) / 255.0), float(fps) else: raise ValueError(f"Unknown read_video mode {mode}")
[docs] def write_video( path: str, video: Union[Tensor, ndarray], fps: float, order: Literal["RGB", "BGR"] = "RGB", codec: str = "mp4v", ) -> None: """Write a video from frames. Args: path: Path to write the video file. video: Video frames, [T, H, W, C] where C is 3 or 4. Can be numpy array (uint8 or float in [0, 1]) or torch tensor. fps: Frames per second. order: Channel order of the input frames, ``"RGB"`` or ``"BGR"``. codec: FourCC codec string for OpenCV, e.g. ``"mp4v"``, ``"XVID"``. """ if torch.is_tensor(video): video = video.detach().cpu().numpy() if video.ndim == 3: video = video[None, ...] # [H, W, C] -> [1, H, W, C] if video.ndim != 4: raise ValueError(f"write_video expects [T, H, W, C], got shape {video.shape}") if video.dtype == np.float32 or video.dtype == np.float64: video = np.clip(video, 0.0, 1.0) video = (video * 255.0).astype(np.uint8) T, H, W, C = video.shape if C not in (3, 4): raise ValueError(f"write_video expects 3 or 4 channels, got {C}") fourcc = cv2.VideoWriter_fourcc(*codec) os.makedirs(os.path.dirname(path), exist_ok=True) if os.path.dirname(path) != "" else None writer = cv2.VideoWriter(path, fourcc, fps, (W, H)) for i in range(T): frame = video[i] if C == 4: frame = frame[..., :3] # drop alpha if order == "RGB": frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) writer.write(frame) writer.release()
[docs] def get_video_info(path: str) -> Dict[str, Any]: """Inspect a video file using ffprobe and return metadata. Requires ffmpeg / ffprobe to be installed in the system. Args: path: Path to the video file. Returns: dict with keys: - path - width, height - fps - duration (seconds) - codec - codec_tag (fourcc / sample entry, e.g. ``"hvc1"`` or ``"hev1"``) - bitrate (bits per second) - filesize (bytes) - num_frames - raw_size (uncompressed RGB size in bytes) - compression_ratio (raw_size / filesize) """ cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,codec_name,codec_tag_string,avg_frame_rate,nb_frames:format=duration,bit_rate", "-of", "json", path, ] result = subprocess.run(cmd, check=True, capture_output=True, text=True) data = json.loads(result.stdout) stream = data["streams"][0] fmt = data["format"] width = int(stream["width"]) height = int(stream["height"]) codec = stream.get("codec_name", "unknown") codec_tag = stream.get("codec_tag_string", None) fps_str = stream.get("avg_frame_rate", "0/0") num, den = fps_str.split("/") fps = float(num) / float(den) if float(den) != 0 else 0.0 duration = float(fmt.get("duration", 0.0)) bitrate = int(fmt.get("bit_rate", 0)) nb_frames_str = stream.get("nb_frames", None) if nb_frames_str is not None and nb_frames_str not in ("0", ""): num_frames = int(nb_frames_str) elif fps > 0 and duration > 0: num_frames = int(round(fps * duration)) else: num_frames = 0 filesize = os.path.getsize(path) if os.path.exists(path) else 0 raw_size = width * height * 3 * num_frames # assume 8-bit RGB compression_ratio = (raw_size / filesize) if filesize > 0 else 0.0 return { "path": path, "width": width, "height": height, "fps": fps, "duration": duration, "codec": codec, "codec_tag": codec_tag, "bitrate": bitrate, "filesize": filesize, "num_frames": num_frames, "raw_size": raw_size, "compression_ratio": compression_ratio, }
def _infer_encoder(src_codec: str, codec: Optional[str]) -> str: """Infer a reasonable encoder name from source codec if codec is None.""" if codec is not None: return codec if src_codec == "h264": return "libx264" if src_codec in ["hevc", "h265"]: return "libx265" if src_codec == "mpeg4": return "mpeg4" return "libx264" def _apply_codec_tag(cmd: list, encoder: str, src_tag: str) -> None: """Preserve container codec tag (sample entry) when reasonable. Examples: - keep ``hvc1`` vs ``hev1`` for HEVC - keep ``avc1`` vs ``avc3`` for H.264 """ lower_encoder = encoder.lower() tag = (src_tag or "").lower() if not tag: return if lower_encoder in ["libx265", "hevc", "h265"] and tag in ["hvc1", "hev1"]: cmd += ["-tag:v", tag] elif lower_encoder in ["libx264", "h264"] and tag in ["avc1", "avc3"]: cmd += ["-tag:v", tag] def _apply_rate_control( cmd: list, *, encoder: str, crf: Optional[int], preset: str, src_bitrate: int, src_w: int, src_h: int, dst_w: int, dst_h: int, src_fps: Optional[float], dst_fps: Optional[float], ) -> None: """Append bitrate / CRF options to ffmpeg command. If crf is None and bitrate is known, try to match source bitrate scaled by resolution and fps. Otherwise, fall back to CRF-based mode. """ lower_encoder = encoder.lower() if crf is None and src_bitrate > 0: # Match source bitrate scaled by resolution (and fps if changed) scale_ratio = (dst_w * dst_h) / max(1, src_w * src_h) if dst_fps is not None and src_fps is not None and src_fps > 0: scale_ratio *= dst_fps / src_fps target_bitrate = int(src_bitrate * scale_ratio) # clamp to a reasonable range target_bitrate = max(100_000, min(target_bitrate, src_bitrate * 2)) kbps = max(1, target_bitrate // 1000) cmd += [ "-b:v", f"{kbps}k", "-maxrate", f"{kbps}k", "-bufsize", f"{2 * kbps}k", ] return # CRF-based mode (or bitrate unknown) if crf is None: crf = 18 # sensible default when we cannot infer bitrate if lower_encoder == "mpeg4": # mpeg4 does not use CRF; use qscale instead (1 best, 31 worst) q = max(1, min(31, int(round(crf / 2)))) # map CRF 18 -> q≈9 as a heuristic cmd += [ "-q:v", str(q), ] else: cmd += [ "-crf", str(crf), "-preset", preset, ]
[docs] def resize_video( input_path: str, output_path: str, width: Optional[int] = None, height: Optional[int] = None, codec: Optional[str] = None, crf: Optional[int] = None, preset: str = "medium", fps: float = None, ) -> None: """Resize a video and save to a new file using ffmpeg. Args: input_path: Path to the input video. output_path: Path to the output video. width: Target width. If None, it will be inferred from ``height`` while keeping the aspect ratio. height: Target height. If None, it will be inferred from ``width`` while keeping the aspect ratio. codec: Video codec / encoder name for ffmpeg, e.g. ``"h264"``, ``"hevc"``, ``"libx264"``, ``"libx265"``, ``"mpeg4"``, ``"h264_nvenc"``, ``"hevc_nvenc"``. If None, try to pick a reasonable encoder based on the input codec. crf: Constant Rate Factor (quality, lower is better) for CRF-based codecs (e.g. libx264 / libx265). If None, the function will try to roughly match the source video's bitrate (scaled by resolution/fps) instead of using CRF. For ``"mpeg4"``, this is mapped to a quantizer value ``q:v`` internally when CRF is provided. preset: ffmpeg preset, e.g. ``"slow"``, ``"medium"``, ``"fast"``. fps: If not None, resample video to this FPS. """ os.makedirs(os.path.dirname(output_path), exist_ok=True) if os.path.dirname(output_path) != "" else None # inspect source info = get_video_info(input_path) src_w, src_h = info["width"], info["height"] src_fps = info["fps"] if info["fps"] > 0 else None src_bitrate = info["bitrate"] src_codec = (info["codec"] or "").lower() src_tag = (info.get("codec_tag") or "").lower() # infer missing spatial dimension to keep aspect ratio if width is None and height is None: # resolution unchanged, maybe we are changing fps/codec. width = src_w height = src_h if width is None: # infer width from height width = int(round(src_w * (height / src_h))) if height is None: # infer height from width height = int(round(src_h * (width / src_w))) # choose codec if not specified codec = _infer_encoder(src_codec, codec) lower_codec = codec.lower() scale_filter = f"scale={width}:{height}:flags=lanczos" cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", scale_filter, "-c:v", codec, ] # preserve container codec tag (sample entry) for better compatibility _apply_codec_tag(cmd, codec, src_tag) if fps is not None: cmd += ["-r", str(fps)] # quality / rate control (bitrate match or CRF) _apply_rate_control( cmd, encoder=codec, crf=crf, preset=preset, src_bitrate=src_bitrate, src_w=src_w, src_h=src_h, dst_w=width, dst_h=height, src_fps=src_fps, dst_fps=fps, ) cmd += [ "-c:a", "copy", output_path, ] subprocess.run(cmd, check=True)
[docs] def split_video( input_path: str, output_dir: str, timestamps: Sequence[float], codec: Optional[str] = None, crf: Optional[int] = None, preset: str = "medium", uniform: float = None, drop_last: bool = False, ) -> None: """Split a long video into shorter clips given split timestamps. Args: input_path: Path to the input video. output_dir: Directory to save the clips. timestamps: Sequence of timestamps in seconds. - If ``drop_last=False`` and ``uniform=None``: treated as absolute boundaries; the first segment always starts at 0 and the last one ends at the video duration. - If ``drop_last=True`` and ``uniform is None``: treated as segment lengths; the first segment is [0, timestamps[0]], the second is [timestamps[0], timestamps[0] + timestamps[1]], etc. Any remaining tail of the video is dropped. Ignored if ``uniform`` is not None. codec: Video codec / encoder name for ffmpeg, e.g. ``"h264"``, ``"hevc"``, ``"libx264"``, ``"libx265"``, ``"mpeg4"``, ``"h264_nvenc"``, ``"hevc_nvenc"``. If None, try to pick a reasonable encoder based on the input codec. crf: Constant Rate Factor (quality, lower is better). If None, the function will try to roughly match the source video's bitrate. preset: ffmpeg preset, e.g. ``"slow"``, ``"medium"``, ``"fast"``. uniform: If not None, split the video into uniform segments of this many seconds. If ``drop_last=True``, any remaining tail shorter than this interval is dropped. If ``drop_last=False``, a final shorter segment is kept. Cannot be used together with explicit ``timestamps``. drop_last: Drop any remaining part of the video that is not explicitly covered by timestamps or a full uniform interval. """ os.makedirs(output_dir, exist_ok=True) info = get_video_info(input_path) duration = info["duration"] src_w, src_h = info["width"], info["height"] src_fps = info["fps"] if info["fps"] > 0 else None src_bitrate = info["bitrate"] src_codec = (info["codec"] or "").lower() src_tag = (info.get("codec_tag") or "").lower() # choose codec if not specified codec = _infer_encoder(src_codec, codec) if uniform is not None and timestamps: raise ValueError("split_video: specify either timestamps or uniform, not both.") # build boundaries if uniform is not None: if uniform <= 0: raise ValueError("split_video: uniform interval must be > 0.") step = float(uniform) boundaries = [0.0] t = step while t < duration: boundaries.append(t) t += step if not drop_last and boundaries[-1] != duration: boundaries.append(duration) else: # timestamps-based splitting ts = [float(t) for t in timestamps if float(t) > 0.0] if drop_last: # interpret timestamps as segment lengths boundaries = [0.0] acc = 0.0 for seg_len in ts: acc += seg_len if acc >= duration: boundaries.append(duration) break boundaries.append(acc) else: # interpret timestamps as absolute boundaries within (0, duration) ts = sorted(t for t in ts if t < duration) boundaries = [0.0] + ts if boundaries[-1] != duration: boundaries.append(duration) base = os.path.splitext(os.path.basename(input_path))[0] for i in range(len(boundaries) - 1): start = boundaries[i] end = boundaries[i + 1] seg_len = max(0.0, end - start) seg_seconds = int(round(seg_len)) out_path = os.path.join(output_dir, f"{base}_{i:03d}_{seg_seconds}.mp4") cmd = [ "ffmpeg", "-y", "-i", input_path, "-ss", f"{start:.3f}", "-to", f"{end:.3f}", "-c:v", codec, ] # preserve container codec tag (sample entry) for better compatibility _apply_codec_tag(cmd, codec, src_tag) # quality / rate control (bitrate match or CRF, resolution unchanged) _apply_rate_control( cmd, encoder=codec, crf=crf, preset=preset, src_bitrate=src_bitrate, src_w=src_w, src_h=src_h, dst_w=src_w, dst_h=src_h, src_fps=src_fps, dst_fps=src_fps, ) cmd += [ "-c:a", "copy", out_path, ] subprocess.run(cmd, check=True)
def main(): import argparse parser = argparse.ArgumentParser(description="Simple video tools based on OpenCV + ffmpeg.") subparsers = parser.add_subparsers(dest="cmd", required=True) # info parser_info = subparsers.add_parser("info", help="Inspect video information.") parser_info.add_argument("path", type=str, help="Path to input video.") # resize parser_resize = subparsers.add_parser("resize", help="Resize a video and save to output.") parser_resize.add_argument("input", type=str, help="Path to input video.") parser_resize.add_argument("output", type=str, help="Path to output video.") parser_resize.add_argument( "--width", type=int, default=None, help="Target width. If omitted, inferred from height to keep aspect ratio.", ) parser_resize.add_argument( "--height", type=int, default=None, help="Target height. If omitted, inferred from width to keep aspect ratio.", ) parser_resize.add_argument( "--codec", type=str, default=None, choices=["h264", "hevc", "libx264", "libx265", "mpeg4", "h264_nvenc", "hevc_nvenc"], help="ffmpeg video codec / encoder name. If omitted, try to infer from the input.", ) parser_resize.add_argument( "--crf", type=int, default=None, help="ffmpeg CRF (quality). If omitted, try to roughly match the input bitrate.", ) parser_resize.add_argument("--preset", type=str, default="medium", help="ffmpeg preset.") # split parser_split = subparsers.add_parser("split", help="Split a video into clips.") parser_split.add_argument("input", type=str, help="Path to input video.") parser_split.add_argument("output_dir", type=str, help="Directory to save clips.") parser_split.add_argument( "--timestamps", type=float, nargs="*", default=None, help="Optional split timestamps in seconds, e.g. --timestamps 10 20 30.", ) parser_split.add_argument( "--uniform", type=float, default=None, help="Uniform segment length in seconds, e.g. --uniform 10.", ) parser_split.add_argument( "--drop_last", action="store_true", help="Drop any remaining tail not covered by timestamps or full uniform intervals.", ) parser_split.add_argument( "--codec", type=str, default=None, choices=["h264", "hevc", "libx264", "libx265", "mpeg4", "h264_nvenc", "hevc_nvenc"], help="ffmpeg video codec / encoder name. If omitted, try to infer from the input.", ) parser_split.add_argument( "--crf", type=int, default=None, help="ffmpeg CRF (quality). If omitted, try to roughly match the input bitrate.", ) parser_split.add_argument("--preset", type=str, default="medium", help="ffmpeg preset.") args = parser.parse_args() if args.cmd == "info": print_video_info(args.path) elif args.cmd == "resize": resize_video( input_path=args.input, output_path=args.output, width=args.width, height=args.height, codec=args.codec, crf=args.crf, preset=args.preset, ) elif args.cmd == "split": # basic validation for CLI: require either timestamps or uniform if (not args.timestamps or len(args.timestamps) == 0) and args.uniform is None: raise ValueError("split: please provide either --timestamps or --uniform.") if args.uniform is not None and args.timestamps and len(args.timestamps) > 0: raise ValueError("split: please provide only one of --timestamps or --uniform, not both.") split_video( input_path=args.input, output_dir=args.output_dir, timestamps=args.timestamps or [], codec=args.codec, crf=args.crf, preset=args.preset, uniform=args.uniform, drop_last=args.drop_last, ) if __name__ == "__main__": main()