diff --git a/.gitignore b/.gitignore index 1b7f1db..05eff5c 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,4 @@ tests/pytest.ini tests/htmlcov musicalgestures/pose/body_25/pose_iter_584000.caffemodel musicalgestures/pose/body_25/.wget-hsts +dancer_grid.png diff --git a/musicalgestures/_video.py b/musicalgestures/_video.py index 948dcde..b6bb0a3 100644 --- a/musicalgestures/_video.py +++ b/musicalgestures/_video.py @@ -276,8 +276,14 @@ def __repr__(self): return f"MgVideo('{self.filename}')" def numpy(self): - "Pipe all video frames from FFmpeg to numpy array" + """ + Read all video frames into a numpy array using FFmpeg. + Returns: + tuple: A tuple ``(array, fps)`` where ``array`` is a ``numpy.ndarray`` + of shape ``(N, H, W, 3)`` in BGR format (uint8) containing all N + frames, and ``fps`` is the frame rate of the video. + """ # Define ffmpeg command and load all the video frames in memory cmd = ["ffmpeg", "-y", "-i", self.filename] process = ffmpeg_cmd(cmd, total_time=self.length, pipe="load") @@ -289,13 +295,26 @@ def numpy(self): return array, self.fps def from_numpy(self, array, fps, target_name=None): - if target_name is not None: - self.filename = os.path.splitext(target_name)[0] + self.fex + """ + Writes a numpy array of video frames to a video file using FFmpeg. - if self.path is not None: - target_name = os.path.join(self.path, self.filename) + After writing, updates ``self.filename``, ``self.of``, and ``self.fex`` to + reflect the actual output path so that subsequent operations on this object + refer to the newly created file. + + Args: + array (np.ndarray): Video frames array with shape (N, H, W, 3) in BGR format. + fps (float): Frames per second for the output video. + target_name (str, optional): Full path for the output file. If None, uses + ``self.path/self.filename`` (or just ``self.filename`` if path is None). + Defaults to None. + """ + if target_name is not None: + write_path = os.path.splitext(target_name)[0] + self.fex + elif self.path is not None: + write_path = os.path.join(self.path, self.filename) else: - target_name = self.filename + write_path = self.filename process = None for frame in array: @@ -319,14 +338,16 @@ def from_numpy(self, array, fps, target_name=None): "libx264", "-pix_fmt", "yuv420p", - target_name, + write_path, ] process = ffmpeg_cmd(cmd, total_time=array.shape[0], pipe="write") process.stdin.write(frame.astype(np.uint8)) process.stdin.close() process.wait() - return + # Update self.filename to the actual written path so that get_video() can find the file + self.filename = write_path + self.of, self.fex = os.path.splitext(write_path) def extract_frame(self, **kwargs): """ diff --git a/tests/test_numpy.py b/tests/test_numpy.py new file mode 100644 index 0000000..6f7b551 --- /dev/null +++ b/tests/test_numpy.py @@ -0,0 +1,164 @@ +"""Tests for numpy array read/write and memory-based processing flow (issue #294).""" +import os +import numpy as np +import pytest +import musicalgestures +from musicalgestures._utils import extract_subclip + + +@pytest.fixture(scope="module") +def testvideo_avi(tmp_path_factory): + target_name = os.path.join(str(tmp_path_factory.mktemp("data")), "testvideo.avi") + return extract_subclip(musicalgestures.examples.dance, 5, 6, target_name=target_name) + + +class Test_MgVideo_numpy: + """Tests for MgVideo.numpy() – read video frames as numpy array.""" + + def test_returns_tuple(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.numpy() + assert isinstance(result, tuple) + assert len(result) == 2 + + def test_array_shape(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + array, fps = mg.numpy() + # shape should be (N_frames, height, width, 3) + assert array.ndim == 4 + assert array.shape[1] == mg.height + assert array.shape[2] == mg.width + assert array.shape[3] == 3 + + def test_array_dtype(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + array, fps = mg.numpy() + assert array.dtype == np.uint8 + + def test_fps_matches(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + array, fps = mg.numpy() + assert fps == mg.fps + + def test_frame_count(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + array, fps = mg.numpy() + from musicalgestures._utils import get_framecount + expected_frames = get_framecount(testvideo_avi) + assert array.shape[0] == expected_frames + + +class Test_MgAudio_numpy: + """Tests for MgAudio.numpy() – read audio as numpy array.""" + + def test_returns_array(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.audio.numpy() + assert isinstance(result, np.ndarray) + + def test_array_1d(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.audio.numpy() + assert result.ndim == 1 + + def test_sample_rate_set(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + mg.audio.numpy() + assert mg.audio.sr > 0 + + def test_array_length_matches_duration(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.audio.numpy() + # Audio duration = n_samples / sr, should be roughly 1 second (we extracted 5-6 s) + duration = len(result) / mg.audio.sr + assert 0.5 < duration < 2.0 + + +class Test_MgVideo_from_numpy: + """Tests for creating MgVideo from a numpy array (via __init__ array parameter).""" + + def test_init_with_array_no_path(self, testvideo_avi, tmp_path): + mg = musicalgestures.MgVideo(testvideo_avi) + array, fps = mg.numpy() + out_file = str(tmp_path / "from_arr.avi") + new_mg = musicalgestures.MgVideo( + filename=out_file, + array=array[:30], + fps=fps, + ) + assert os.path.isfile(new_mg.filename) + assert new_mg.fps == fps + assert new_mg.width == array.shape[2] + assert new_mg.height == array.shape[1] + + def test_init_with_array_and_path(self, testvideo_avi, tmp_path): + mg = musicalgestures.MgVideo(testvideo_avi) + array, fps = mg.numpy() + new_mg = musicalgestures.MgVideo( + filename="arr_output.avi", + array=array[:30], + fps=fps, + path=str(tmp_path), + ) + expected_path = os.path.join(str(tmp_path), "arr_output.avi") + assert new_mg.filename == expected_path + assert os.path.isfile(new_mg.filename) + + def test_from_numpy_direct_call(self, testvideo_avi, tmp_path): + mg = musicalgestures.MgVideo(testvideo_avi) + array, fps = mg.numpy() + target = str(tmp_path / "direct.avi") + mg.from_numpy(array[:30], fps, target_name=target) + assert os.path.isfile(target) + + def test_roundtrip_frame_count(self, testvideo_avi, tmp_path): + """Array written to disk should have the same number of frames.""" + mg = musicalgestures.MgVideo(testvideo_avi) + array, fps = mg.numpy() + n_frames = 20 + out_file = str(tmp_path / "roundtrip.avi") + new_mg = musicalgestures.MgVideo( + filename=out_file, + array=array[:n_frames], + fps=fps, + ) + from musicalgestures._utils import get_framecount + assert get_framecount(new_mg.filename) == n_frames + + +class Test_mg_grid_return_array: + """Tests for mg_grid() memory-based flow (return_array=True).""" + + def test_return_array_type(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.grid(height=100, rows=2, cols=2, return_array=True) + assert isinstance(result, np.ndarray) + + def test_return_array_dtype(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.grid(height=100, rows=2, cols=2, return_array=True) + assert result.dtype == np.uint8 + + def test_return_array_shape(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + rows, cols, height = 2, 3, 100 + result = mg.grid(height=height, rows=rows, cols=cols, return_array=True) + assert result.ndim == 3 + assert result.shape[0] == height * rows + assert result.shape[2] == 3 # RGB channels + + def test_no_file_written(self, testvideo_avi, tmp_path): + """return_array=True should not write any file to disk.""" + mg = musicalgestures.MgVideo(testvideo_avi) + of = os.path.splitext(testvideo_avi)[0] + expected_file = of + "_grid.png" + if os.path.exists(expected_file): + os.remove(expected_file) + mg.grid(height=100, rows=2, cols=2, return_array=True) + assert not os.path.exists(expected_file) + + def test_return_mgimage_when_no_array(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.grid(height=100, rows=2, cols=2, return_array=False) + assert isinstance(result, musicalgestures.MgImage) + assert os.path.isfile(result.filename)