Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ jobs:
- os: macos-26-xlarge
name: macos-arm64
build_cmd: ./build.sh release-tests
# E2E not possible on GHA Mac runner currently
e2e-testing: false
e2e-testing: true
- os: macos-26-large
name: macos-x64
build_cmd: ./build.sh release-tests --macos-arch x86_64
# E2E not possible on GHA Mac runner currently
e2e-testing: false
e2e-testing: true
# Pinned to Windows 2022 for current VS 17 implementation
- os: windows-2022
name: windows-x64
Expand Down
140 changes: 0 additions & 140 deletions src/tests/integration/test_data_track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
#include <livekit/e2ee.h>
#include <livekit/remote_data_track.h>

#include <cmath>
#include <condition_variable>
#include <exception>
#include <future>
#include <tuple>

#include "../common/test_common.h"
#include "ffi_client.h"
#include "lk_log.h"

namespace livekit::test {

Expand Down Expand Up @@ -287,135 +284,9 @@ void runEncryptedDataTrackRoundTrip(KeyDerivationFunction key_derivation_functio

class DataTrackE2ETest : public LiveKitTestBase {};

class DataTrackTransportTest : public DataTrackE2ETest,
public ::testing::WithParamInterface<std::tuple<double, size_t>> {};

class DataTrackKeyDerivationTest : public DataTrackE2ETest,
public ::testing::WithParamInterface<KeyDerivationFunction> {};

TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) {
const auto publish_fps = std::get<0>(GetParam());
const auto payload_len = std::get<1>(GetParam());
const auto track_name = makeTrackName("transport");

// How long to publish frames for.
constexpr auto PUBLISH_DURATION = 10s;

// Percentage of total frames that must be received on the subscriber end in
// order for the test to pass.
constexpr float MIN_PERCENTAGE = 0.90f;

std::vector<TestRoomConnectionOptions> room_configs(2);
room_configs[0].room_options.single_peer_connection = false;
room_configs[1].room_options.single_peer_connection = false;

DataTrackPublishedDelegate subscriber_delegate;
room_configs[1].delegate = &subscriber_delegate;

auto rooms = testRooms(room_configs);
auto& publisher_room = rooms[0];
const auto publisher_identity = publisher_room->localParticipant()->identity();

auto track = requirePublishedTrack(publisher_room->localParticipant(), track_name);
std::cerr << "Track published\n";

auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout);
std::cerr << "Got remote track: " << remote_track->info().sid << "\n";

ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track";
EXPECT_TRUE(remote_track->isPublished());
EXPECT_FALSE(remote_track->info().uses_e2ee);
EXPECT_EQ(remote_track->info().name, track_name);
EXPECT_EQ(remote_track->publisherIdentity(), publisher_identity);

const auto frame_count =
static_cast<size_t>(std::llround(std::chrono::duration<double>(PUBLISH_DURATION).count() * publish_fps));

auto publish = [&]() {
if (!track->isPublished()) {
throw std::runtime_error("Publisher failed to publish data track");
}
if (track->info().uses_e2ee) {
throw std::runtime_error("Unexpected E2EE on test data track");
}
if (track->info().name != track_name) {
throw std::runtime_error("Published track name mismatch");
}

const auto frame_interval = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(1.0 / publish_fps));
auto next_send = std::chrono::steady_clock::now();

std::cout << "Publishing " << frame_count << " frames with payload length " << payload_len << '\n';
for (size_t index = 0; index < frame_count; ++index) {
std::vector<std::uint8_t> payload(payload_len, static_cast<std::uint8_t>(index));
requirePushSuccess(track->tryPush(std::move(payload)), "Failed to push data frame");

next_send += frame_interval;
std::this_thread::sleep_until(next_send);
}

track->unpublishDataTrack();
};

auto subscribe_result = remote_track->subscribe();
if (!subscribe_result) {
FAIL() << describeDataTrackError(subscribe_result.error());
}
auto subscription = subscribe_result.value();

std::promise<size_t> receive_count_promise;
auto receive_count_future = receive_count_promise.get_future();

auto subscribe = [&]() {
size_t received_count = 0;
DataTrackFrame frame;
while (subscription->read(frame) && received_count < frame_count) {
if (frame.payload.empty()) {
throw std::runtime_error("Received empty data frame");
}

const auto first_byte = frame.payload.front();
if (!std::all_of(frame.payload.begin(), frame.payload.end(),
[first_byte](std::uint8_t byte) { return byte == first_byte; })) {
throw std::runtime_error("Received frame with inconsistent payload");
}
if (frame.user_timestamp.has_value()) {
throw std::runtime_error("Received unexpected user timestamp in transport test");
}

++received_count;
}

receive_count_promise.set_value(received_count);
};

// Launch both publisher and subscriber
auto pub_fut = std::async(std::launch::async, publish);
auto sub_fut = std::async(std::launch::async, subscribe);

// Wait for both, with a combined deadline (the timeout(...) wrapper).
const auto deadline = std::chrono::steady_clock::now() + PUBLISH_DURATION + 25s;

const bool pub_ok = pub_fut.wait_until(deadline) == std::future_status::ready;
const bool sub_ok = sub_fut.wait_until(deadline) == std::future_status::ready;

if (!pub_ok || !sub_ok) {
ADD_FAILURE() << "Timed out waiting for data frames";
}

// Equivalent of `try_join!`'s ? — re-throws any exception from either task
pub_fut.get();
sub_fut.get();

const auto received_count = receive_count_future.get();
const auto received_percent = static_cast<float>(received_count) / static_cast<float>(frame_count);
std::cout << "Received " << received_count << "/" << frame_count << " frames (" << received_percent * 100.0f << "%)"
<< '\n';

EXPECT_GE(received_percent, MIN_PERCENTAGE) << "Received " << received_count << "/" << frame_count << " frames";
}

TEST_F(DataTrackE2ETest, UnpublishUpdatesPublishedStateEndToEnd) {
const auto track_name = makeTrackName("published_state");

Expand Down Expand Up @@ -853,21 +724,10 @@ TEST_F(DataTrackE2ETest, PreservesUserTimestampOnEncryptedDataTrack) {
local_track->unpublishDataTrack();
}

std::string dataTrackParamName(const ::testing::TestParamInfo<std::tuple<double, size_t>>& info) {
if (std::get<0>(info.param) > 100.0) {
return "HighFpsSinglePacket";
}
return "LowFpsMultiPacket";
}

std::string keyDerivationParamName(const ::testing::TestParamInfo<KeyDerivationFunction>& info) {
return keyDerivationFunctionName(info.param);
}

INSTANTIATE_TEST_SUITE_P(DataTrackScenarios, DataTrackTransportTest,
::testing::Values(std::make_tuple(120.0, size_t{8192}), std::make_tuple(10.0, size_t{196608})),
dataTrackParamName);

INSTANTIATE_TEST_SUITE_P(KeyDerivationFunctions, DataTrackKeyDerivationTest,
::testing::Values(KeyDerivationFunction::PBKDF2, KeyDerivationFunction::HKDF),
keyDerivationParamName);
Expand Down
Loading