Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ tests/pytest.ini
tests/htmlcov
musicalgestures/pose/body_25/pose_iter_584000.caffemodel
musicalgestures/pose/body_25/.wget-hsts
dancer_grid.png
37 changes: 29 additions & 8 deletions musicalgestures/_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,14 @@ def __repr__(self):
return f"MgVideo('{self.filename}')"

def numpy(self):
"Pipe all video frames from FFmpeg to numpy array"
"""
Read all video frames into a numpy array using FFmpeg.

Returns:
tuple: A tuple ``(array, fps)`` where ``array`` is a ``numpy.ndarray``
of shape ``(N, H, W, 3)`` in BGR format (uint8) containing all N
frames, and ``fps`` is the frame rate of the video.
"""
# Define ffmpeg command and load all the video frames in memory
cmd = ["ffmpeg", "-y", "-i", self.filename]
process = ffmpeg_cmd(cmd, total_time=self.length, pipe="load")
Expand All @@ -289,13 +295,26 @@ def numpy(self):
return array, self.fps

def from_numpy(self, array, fps, target_name=None):
if target_name is not None:
self.filename = os.path.splitext(target_name)[0] + self.fex
"""
Writes a numpy array of video frames to a video file using FFmpeg.

if self.path is not None:
target_name = os.path.join(self.path, self.filename)
After writing, updates ``self.filename``, ``self.of``, and ``self.fex`` to
reflect the actual output path so that subsequent operations on this object
refer to the newly created file.

Args:
array (np.ndarray): Video frames array with shape (N, H, W, 3) in BGR format.
fps (float): Frames per second for the output video.
target_name (str, optional): Full path for the output file. If None, uses
``self.path/self.filename`` (or just ``self.filename`` if path is None).
Defaults to None.
"""
if target_name is not None:
write_path = os.path.splitext(target_name)[0] + self.fex
elif self.path is not None:
write_path = os.path.join(self.path, self.filename)
else:
target_name = self.filename
write_path = self.filename

process = None
for frame in array:
Expand All @@ -319,14 +338,16 @@ def from_numpy(self, array, fps, target_name=None):
"libx264",
"-pix_fmt",
"yuv420p",
target_name,
write_path,
]
process = ffmpeg_cmd(cmd, total_time=array.shape[0], pipe="write")
process.stdin.write(frame.astype(np.uint8))
process.stdin.close()
process.wait()

return
# Update self.filename to the actual written path so that get_video() can find the file
self.filename = write_path
self.of, self.fex = os.path.splitext(write_path)

def extract_frame(self, **kwargs):
"""
Expand Down
164 changes: 164 additions & 0 deletions tests/test_numpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""Tests for numpy array read/write and memory-based processing flow (issue #294)."""
import os
import numpy as np
import pytest
import musicalgestures
from musicalgestures._utils import extract_subclip


@pytest.fixture(scope="module")
def testvideo_avi(tmp_path_factory):
target_name = os.path.join(str(tmp_path_factory.mktemp("data")), "testvideo.avi")
return extract_subclip(musicalgestures.examples.dance, 5, 6, target_name=target_name)


class Test_MgVideo_numpy:
"""Tests for MgVideo.numpy() – read video frames as numpy array."""

def test_returns_tuple(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
result = mg.numpy()
assert isinstance(result, tuple)
assert len(result) == 2

def test_array_shape(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
array, fps = mg.numpy()
# shape should be (N_frames, height, width, 3)
assert array.ndim == 4
assert array.shape[1] == mg.height
assert array.shape[2] == mg.width
assert array.shape[3] == 3

def test_array_dtype(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
array, fps = mg.numpy()
assert array.dtype == np.uint8

def test_fps_matches(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
array, fps = mg.numpy()
assert fps == mg.fps

def test_frame_count(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
array, fps = mg.numpy()
from musicalgestures._utils import get_framecount
expected_frames = get_framecount(testvideo_avi)
assert array.shape[0] == expected_frames


class Test_MgAudio_numpy:
"""Tests for MgAudio.numpy() – read audio as numpy array."""

def test_returns_array(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
result = mg.audio.numpy()
assert isinstance(result, np.ndarray)

def test_array_1d(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
result = mg.audio.numpy()
assert result.ndim == 1

def test_sample_rate_set(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
mg.audio.numpy()
assert mg.audio.sr > 0

def test_array_length_matches_duration(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
result = mg.audio.numpy()
# Audio duration = n_samples / sr, should be roughly 1 second (we extracted 5-6 s)
duration = len(result) / mg.audio.sr
assert 0.5 < duration < 2.0


class Test_MgVideo_from_numpy:
"""Tests for creating MgVideo from a numpy array (via __init__ array parameter)."""

def test_init_with_array_no_path(self, testvideo_avi, tmp_path):
mg = musicalgestures.MgVideo(testvideo_avi)
array, fps = mg.numpy()
out_file = str(tmp_path / "from_arr.avi")
new_mg = musicalgestures.MgVideo(
filename=out_file,
array=array[:30],
fps=fps,
)
assert os.path.isfile(new_mg.filename)
assert new_mg.fps == fps
assert new_mg.width == array.shape[2]
assert new_mg.height == array.shape[1]

def test_init_with_array_and_path(self, testvideo_avi, tmp_path):
mg = musicalgestures.MgVideo(testvideo_avi)
array, fps = mg.numpy()
new_mg = musicalgestures.MgVideo(
filename="arr_output.avi",
array=array[:30],
fps=fps,
path=str(tmp_path),
)
expected_path = os.path.join(str(tmp_path), "arr_output.avi")
assert new_mg.filename == expected_path
assert os.path.isfile(new_mg.filename)

def test_from_numpy_direct_call(self, testvideo_avi, tmp_path):
mg = musicalgestures.MgVideo(testvideo_avi)
array, fps = mg.numpy()
target = str(tmp_path / "direct.avi")
mg.from_numpy(array[:30], fps, target_name=target)
assert os.path.isfile(target)

def test_roundtrip_frame_count(self, testvideo_avi, tmp_path):
"""Array written to disk should have the same number of frames."""
mg = musicalgestures.MgVideo(testvideo_avi)
array, fps = mg.numpy()
n_frames = 20
out_file = str(tmp_path / "roundtrip.avi")
new_mg = musicalgestures.MgVideo(
filename=out_file,
array=array[:n_frames],
fps=fps,
)
from musicalgestures._utils import get_framecount
assert get_framecount(new_mg.filename) == n_frames


class Test_mg_grid_return_array:
"""Tests for mg_grid() memory-based flow (return_array=True)."""

def test_return_array_type(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
result = mg.grid(height=100, rows=2, cols=2, return_array=True)
assert isinstance(result, np.ndarray)

def test_return_array_dtype(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
result = mg.grid(height=100, rows=2, cols=2, return_array=True)
assert result.dtype == np.uint8

def test_return_array_shape(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
rows, cols, height = 2, 3, 100
result = mg.grid(height=height, rows=rows, cols=cols, return_array=True)
assert result.ndim == 3
assert result.shape[0] == height * rows
assert result.shape[2] == 3 # RGB channels

def test_no_file_written(self, testvideo_avi, tmp_path):
"""return_array=True should not write any file to disk."""
mg = musicalgestures.MgVideo(testvideo_avi)
of = os.path.splitext(testvideo_avi)[0]
expected_file = of + "_grid.png"
if os.path.exists(expected_file):
os.remove(expected_file)
mg.grid(height=100, rows=2, cols=2, return_array=True)
assert not os.path.exists(expected_file)

def test_return_mgimage_when_no_array(self, testvideo_avi):
mg = musicalgestures.MgVideo(testvideo_avi)
result = mg.grid(height=100, rows=2, cols=2, return_array=False)
assert isinstance(result, musicalgestures.MgImage)
assert os.path.isfile(result.filename)
Loading