Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4f81dfa
Linux build (#37)
lhoward Jul 18, 2023
3d685a5
Use OpenCombine on Linux (#37)
lhoward Jul 18, 2023
633491c
Android build
lhoward Oct 1, 2024
04f2c3d
workaround for swiftlang/swift#77315 compiler crash
lhoward Nov 1, 2024
9d66bde
Fix race condition in `AsyncCurrentValueSubject`
wfltaylor Jan 31, 2024
d6dbbae
Fix race condition in `AsyncThrowingCurrentValueSubject`
wfltaylor Jan 31, 2024
4246f11
fix: remove overlapping operators from swift-async-algorithms
lachenmayer Dec 6, 2023
484ea76
docs: remove sentence about overlaps in README
lachenmayer Dec 6, 2023
dd30442
re-add variadic merge
lachenmayer Dec 6, 2023
f41a9fb
Revert "Merge pull request #32 from sideeffect-io/fix/multicast-upstr…
lachenmayer Dec 18, 2023
e3de960
fix test compilation
lachenmayer Apr 18, 2024
8f7a200
update for new withTaskCancellationHandler parameter order
lhoward Nov 21, 2024
cfdfcec
don't use @_implementationOnly
lhoward Nov 21, 2024
84d16b6
use Atomics package instead of Locking where possible
lhoward Nov 21, 2024
b16126f
update to use await fulfillment(of:timeout:) testing API
lhoward Nov 21, 2024
ec16e3b
only take lock once in handleNewConsumer()
lhoward Nov 22, 2024
d050ac1
use Array instead of OrderedSet in awaitings, seems to fix memory leak
lhoward Nov 22, 2024
206d7b6
remove Package.resolved
lhoward Jan 3, 2025
d51c4aa
don't import Foundation if FoundationEssentials/Dispatch available
lhoward Jan 3, 2025
8f1da30
unregister channels after iteration complete
lhoward Feb 11, 2026
13e7d02
Fix impossible AND condition in MergeStateMachine error/termination c…
lhoward Mar 22, 2026
06741fe
Fix race condition in AsyncThrowingCurrentValueSubject.handleNewConsumer
lhoward Mar 22, 2026
c62574d
Fix race condition in AsyncReplaySubject.handleNewConsumer
lhoward Mar 22, 2026
b97d381
Fix race condition in AsyncThrowingReplaySubject.handleNewConsumer
lhoward Mar 22, 2026
13a2c2b
build: add CMake support for shared-library builds
lhoward May 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
xcuserdata/
DerivedData/
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
Package.resolved
32 changes: 32 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#[[
This source file is part of the AsyncExtensions open source project

Copyright (c) 2024 The AsyncExtensions project authors
Licensed under MIT License

See https://github.com/sideeffect-io/AsyncExtensions/blob/main/LICENSE for license information
#]]

cmake_minimum_required(VERSION 3.16)
project(AsyncExtensions
LANGUAGES Swift)

list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake/modules)

set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
set(CMAKE_Swift_MODULE_DIRECTORY ${CMAKE_BINARY_DIR}/swift)

if(CMAKE_SYSTEM_NAME STREQUAL Windows OR CMAKE_SYSTEM_NAME STREQUAL Darwin)
option(BUILD_SHARED_LIBS "Build shared libraries by default" YES)
endif()

include(GNUInstallDirs)
include(SwiftSupport)

find_package(SwiftCollections REQUIRED)
find_package(SwiftAtomics REQUIRED)

add_subdirectory(Sources)
add_subdirectory(cmake/modules)
16 changes: 0 additions & 16 deletions Package.resolved

This file was deleted.

34 changes: 21 additions & 13 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.5
// swift-tools-version:6.0
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription
Expand All @@ -16,23 +16,31 @@ let package = Package(
name: "AsyncExtensions",
targets: ["AsyncExtensions"]),
],
dependencies: [.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3"))],
dependencies: [
.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3")),
.package(url: "https://github.com/apple/swift-async-algorithms.git", .upToNextMajor(from: "1.0.0")),
.package(url: "https://github.com/OpenCombine/OpenCombine.git", from: "0.14.0"),
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0")),
],
targets: [
.target(
name: "AsyncExtensions",
dependencies: [.product(name: "Collections", package: "swift-collections")],
path: "Sources"
// ,
// swiftSettings: [
// .unsafeFlags([
// "-Xfrontend", "-warn-concurrency",
// "-Xfrontend", "-enable-actor-data-race-checks",
// ])
// ]
dependencies: [
.product(name: "Collections", package: "swift-collections"),
.product(name: "Atomics", package: "swift-atomics")
],
path: "Sources",
swiftSettings: [.swiftLanguageMode(.v5)]
),
.testTarget(
name: "AsyncExtensionsTests",
dependencies: ["AsyncExtensions"],
path: "Tests"),
dependencies: [
"AsyncExtensions",
.product(name: "OpenCombine", package: "OpenCombine", condition: .when(platforms: [.linux])),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms")
],
path: "Tests",
swiftSettings: [.swiftLanguageMode(.v5)]
),
]
)
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

**AsyncExtensions** provides a collection of operators that intends to ease the creation and combination of `AsyncSequences`.

**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms). For now there is an overlap between both libraries, but when **swift-async-algorithms** becomes stable the overlapping operators while be deprecated in **AsyncExtensions**. Nevertheless **AsyncExtensions** will continue to provide the operators that the community needs and are not provided by Apple.
**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms), which provides operators that the community needs and are not provided by Apple.

## Adding AsyncExtensions as a Dependency

Expand Down Expand Up @@ -44,11 +44,6 @@ AsyncStream)
* [AsyncThrowingReplaySubject](./Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift): Throwing subject with a shared output. Maintains and replays a buffered amount of values

### Combiners
* [`zip(_:_:)`](./Sources/Combiners/Zip/AsyncZip2Sequence.swift): Zips two `AsyncSequence` into an AsyncSequence of tuple of elements
* [`zip(_:_:_:)`](./Sources/Combiners/Zip/AsyncZip3Sequence.swift): Zips three `AsyncSequence` into an AsyncSequence of tuple of elements
* [`zip(_:)`](./Sources/Combiners/Zip/AsyncZipSequence.swift): Zips any async sequences into an array of elements
* [`merge(_:_:)`](./Sources/Combiners/Merge/AsyncMerge2Sequence.swift): Merges two `AsyncSequence` into an AsyncSequence of elements
* [`merge(_:_:_:)`](./Sources/Combiners/Merge/AsyncMerge3Sequence.swift): Merges three `AsyncSequence` into an AsyncSequence of elements
* [`merge(_:)`](./Sources/Combiners/Merge/AsyncMergeSequence.swift): Merges any `AsyncSequence` into an AsyncSequence of elements
* [`withLatest(_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFromSequence.swift): Combines elements from self with the last known element from an other `AsyncSequence`
* [`withLatest(_:_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFrom2Sequence.swift): Combines elements from self with the last known elements from two other async sequences
Expand All @@ -58,7 +53,6 @@ AsyncStream)
* [AsyncFailSequence](./Sources/Creators/AsyncFailSequence.swift): Creates an `AsyncSequence` that immediately fails
* [AsyncJustSequence](./Sources/Creators/AsyncJustSequence.swift): Creates an `AsyncSequence` that emits an element an finishes
* [AsyncThrowingJustSequence](./Sources/Creators/AsyncThrowingJustSequence.swift): Creates an `AsyncSequence` that emits an elements and finishes bases on a throwing closure
* [AsyncLazySequence](./Sources/Creators/AsyncLazySequence.swift): Creates an `AsyncSequence` of the elements from the base sequence
* [AsyncTimerSequence](./Sources/Creators/AsyncTimerSequence.swift): Creates an `AsyncSequence` that emits a date value periodically
* [AsyncStream Pipe](./Sources/Creators/AsyncStream+Pipe.swift): Creates an AsyncStream and returns a tuple standing for its inputs and outputs

Expand Down
33 changes: 19 additions & 14 deletions Sources/AsyncChannels/AsyncBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Thibault Wittemberg on 07/01/2022.
//

import Atomics
import DequeModule
import OrderedCollections

Expand Down Expand Up @@ -69,27 +70,24 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
enum State: @unchecked Sendable {
case idle
case queued(Deque<Value>)
case awaiting(OrderedSet<Awaiting>)
case awaiting([Awaiting])
case finished

static var initial: State {
.idle
}
}

let ids: ManagedCriticalState<Int>
let ids: ManagedAtomic<Int>
let state: ManagedCriticalState<State>

public init() {
self.ids = ManagedCriticalState(0)
self.ids = ManagedAtomic(0)
self.state = ManagedCriticalState(.initial)
}

func generateId() -> Int {
self.ids.withCriticalRegion { ids in
ids += 1
return ids
}
ids.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
}

var hasBufferedElements: Bool {
Expand Down Expand Up @@ -155,12 +153,12 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda

func next(onSuspend: (() -> Void)? = nil) async -> Element? {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)
let cancellation = ManagedAtomic<Bool>(false)

return await withTaskCancellationHandler {
await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Element?, Never>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
let isCancelled = cancellation.load(ordering: .acquiring)
guard !isCancelled else { return .resume(nil) }

switch state {
Expand All @@ -184,7 +182,13 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
return .suspend
}
case .awaiting(var awaitings):
awaitings.updateOrAppend(Awaiting(id: awaitingId, continuation: continuation))
let awaiting = Awaiting(id: awaitingId, continuation: continuation)

if let index = awaitings.firstIndex(where: { $0 == awaiting }) {
awaitings[index] = awaiting
} else {
awaitings.append(awaiting)
}
state = .awaiting(awaitings)
return .suspend
case .finished:
Expand All @@ -200,12 +204,13 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
cancellation.store(true, ordering: .releasing)
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
let index = awaitings.firstIndex(where: { $0 == .placeHolder(id: awaitingId) })
guard let index else { return nil }
let awaiting = awaitings[index]
awaitings.remove(at: index)
if awaitings.isEmpty {
state = .idle
} else {
Expand Down
33 changes: 19 additions & 14 deletions Sources/AsyncChannels/AsyncThrowingBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Thibault Wittemberg on 07/01/2022.
//

import Atomics
import DequeModule
import OrderedCollections

Expand Down Expand Up @@ -80,27 +81,24 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
enum State: @unchecked Sendable {
case idle
case queued(Deque<Value>)
case awaiting(OrderedSet<Awaiting>)
case awaiting([Awaiting])
case terminated(Termination)

static var initial: State {
.idle
}
}

let ids: ManagedCriticalState<Int>
let ids: ManagedAtomic<Int>
let state: ManagedCriticalState<State>

public init() {
self.ids = ManagedCriticalState(0)
self.ids = ManagedAtomic(0)
self.state = ManagedCriticalState(.initial)
}

func generateId() -> Int {
self.ids.withCriticalRegion { ids in
ids += 1
return ids
}
ids.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
}

var hasBufferedElements: Bool {
Expand Down Expand Up @@ -176,12 +174,12 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS

func next(onSuspend: (() -> Void)? = nil) async throws -> Element? {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)
let cancellation = ManagedAtomic<Bool>(false)

return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { [state] (continuation: UnsafeContinuation<Element?, Error>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
let isCancelled = cancellation.load(ordering: .acquiring)
guard !isCancelled else { return .resume(nil) }

switch state {
Expand All @@ -208,7 +206,13 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
return .suspend
}
case .awaiting(var awaitings):
awaitings.updateOrAppend(Awaiting(id: awaitingId, continuation: continuation))
let awaiting = Awaiting(id: awaitingId, continuation: continuation)

if let index = awaitings.firstIndex(where: { $0 == awaiting }) {
awaitings[index] = awaiting
} else {
awaitings.append(awaiting)
}
state = .awaiting(awaitings)
return .suspend
case .terminated(.finished):
Expand All @@ -227,12 +231,13 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
cancellation.store(true, ordering: .releasing)
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
let index = awaitings.firstIndex(where: { $0 == .placeHolder(id: awaitingId) })
guard let index else { return nil }
let awaiting = awaitings[index]
awaitings.remove(at: index)
if awaitings.isEmpty {
state = .idle
} else {
Expand Down
Loading