Peter-Schorn/SpotifyAPI

Help? Extending pages of a Playlist<PlaylistItems>

danwood opened this issue · 5 comments

I'm trying to get all the pages of a Playlist using .extendPages(spotifyAPI). However, a Playlist is not Paginated — it's the PlaylistItems that is. So I can't call extendPages on the result of func playlist( _ playlist: SpotifyURIConvertible, market: String?).

If I were to call .map(\.items) before .extendPages(self) in my code, that converts the resulting publisher to a publisher of PlaylistItems, and I've lost the context of the Playlist publisher. So somehow I need to hang onto the Playlist and stitch together the results of the fully loaded Playlist items.

This is way above my Combine understanding! (As you can see by my work in progress code here, I'm more comfortable with async/await!) Can Peter or anybody else offer some suggestions? Not sure if it would be worth rolling this solution into the API for others to use in the future …

func playlist(_ uri: SpotifyURIConvertible) async throws -> Playlist<PlaylistItems>?
{
	let result: Playlist<PlaylistItems> = try await playlist(uri, market: Spotify.currentMarket)	// Playlist<PlaylistItems> . Not Paginated, but the PlaylistItems is.
		// IDEA: .map(\.items)		// This would convert to Paginated items, but how do we rebuild as Playlist<PlaylistItems>?
		.extendPages(self)		// above must be Paginated, so this doesn't work.
		.receive(on: RunLoop.main)
		.async() // my utility using `withCheckedThrowingContinuation` to convert to async code
	return result
}

After reading your comment more thoroughly, I have developed a solution that should satisfy all of your requirements. You should be able to easily adapt it to work in an async context.

do {

    // https://open.spotify.com/playlist/0DoorcbBIsa7J6NW9FlLio?si=a5d775610b124051
    // 387 songs (the web API only returns a max of 100 items per page)
    let playlistURI = SpotifyIdentifier(
        id: "0DoorcbBIsa7J6NW9FlLio",
        idCategory: .playlist
    )

    
    // `PlaylistItemContainer<PlaylistItem>` is each item (track/episode) in the playlist
    // `Playlist<PlaylistItems>` contains the details of the playlist, and the `items` property 
    // contains the first page of items
    let result: ([PlaylistItemContainer<PlaylistItem>], Playlist<PlaylistItems>) = try spotifyAPI
        .playlist(playlistURI)
        .flatMap({ playlist in
        // `flatMap` allows us to provide a closure that accepts the output 
        // of the upstream publisher as input and creates a new publisher
            spotifyAPI
                .extendPagesConcurrently(playlist.items)  // retrieve all pages of results *concurrently*
                .collectAndSortByOffset()  // collect just the items in the pages into a single array
                // combine the current publisher (which collectes all pages of items in the playlist)
                // with a publisher that just publishes the playlist that we already received
                .zip(Result<Playlist<PlaylistItems>, Error>.success(playlist).publisher)
        })
        // my own utility that waits for the publisher to return a value synchronously
        .waitForSingleValue()!

    // test output

    let playlistItems = result.0
    let playlist = result.1

    print("\nplaylist name: \(playlist.name)")
    print("items count: \(playlistItems.count)\n")
    
    let itemNames = playlistItems
        .compactMap(\.item?.name)
        .joined(separator: "\n")

    print("--------------------\nplaylist item names:\n\(itemNames)")


} catch let error {
    print("caught error: \(error)")
}

Also, from just looking at your .async() combine utility, I'm curious: How does it handle publishers that finish normally without publishing any values? (My waitForSingleValue() utility returns nil in this case.)

@Peter-Schorn Thanks for your notes above. Here are the async functions I've come up with. Hopefully this is of use to somebody! Everything seems to work, fingers crossed!


enum AsyncError: Error {
	case finishedWithoutValue
}

extension Publisher where Output : PagingObjectProtocol {
	func asyncPaginated() async throws -> Output {
		try await withCheckedThrowingContinuation { continuation in
			var cancellable: AnyCancellable?
			var accumulatedValues: [Self.Output] = []
			cancellable = self
				.sink { completion in
					switch completion {
					case .finished:
						guard let first = accumulatedValues.first else { continuation.resume(throwing: AsyncError.finishedWithoutValue) ; return }
						guard accumulatedValues.count > 1 else { continuation.resume(with: .success(first)) ; return }	// No point in merging
						let allItems: [some Codable & Hashable] = accumulatedValues.map(\.items).flatMap { $0 }
						let result = PagingObject(href: first.href, items: allItems, limit: allItems.count, next: nil, previous: nil, offset: 0, total: allItems.count)
						continuation.resume(with: .success(result as! Self.Output))
					case let .failure(error):
						continuation.resume(throwing: error)
					}
					cancellable?.cancel()
				} receiveValue: {
					accumulatedValues.append($0)
				}
		}
	}
}

extension Publisher {
	func asyncCursorPaginated<T>() async throws -> Output
	where Output == CursorPagingObject<T>
	{
		try await withCheckedThrowingContinuation { continuation in
			var cancellable: AnyCancellable?
			var accumulatedValues: [Self.Output] = []
			cancellable = self
				.sink { completion in
					switch completion {
					case .finished:
						guard let first = accumulatedValues.first else { continuation.resume(throwing: AsyncError.finishedWithoutValue) ; return }
						guard accumulatedValues.count > 1 else { continuation.resume(with: .success(first)) ; return }	// No point in merging
						let allItems: [some Codable & Hashable] = accumulatedValues.map(\.items).flatMap { $0 }
						let result = CursorPagingObject(href: first.href, items: allItems, limit: allItems.count, next: nil, cursors: nil, total: allItems.count)
						continuation.resume(with: .success(result))
					case let .failure(error):
						continuation.resume(throwing: error)
					}
					cancellable?.cancel()
				} receiveValue: {
					accumulatedValues.append($0)
				}
		}
	}
}
	// Original one, not for paginated, multi-piece content
	// Based on https://medium.com/geekculture/from-combine-to-async-await-c08bf1d15b77
	// But it doesn't let the next pages in since it just calls first()

extension Publisher {
	func async() async throws -> Output {
		try await withCheckedThrowingContinuation { continuation in
			var cancellable: AnyCancellable?
			var finishedWithoutValue = true
			cancellable = first()
				.sink { result in
					switch result {
					case .finished:
						if finishedWithoutValue {
							continuation.resume(throwing: AsyncError.finishedWithoutValue)
						}
					case let .failure(error):
						continuation.resume(throwing: error)
					}
					cancellable?.cancel()
				} receiveValue: { value in
					finishedWithoutValue = false
					continuation.resume(with: .success(value))
				}
		}
	}
}

There is a lot of repetitiveness in your functions. All of them repeat the logic of transforming the publisher into a single async value:

extension Publisher {
	func async() async throws -> Output {
		try await withCheckedThrowingContinuation { continuation in
			var cancellable: AnyCancellable?
			var finishedWithoutValue = true
			cancellable = first()
				.sink { result in
					switch result {
					case .finished:
						if finishedWithoutValue {
							continuation.resume(throwing: AsyncError.finishedWithoutValue)
						}
					case let .failure(error):
						continuation.resume(throwing: error)
					}
					cancellable?.cancel()
				} receiveValue: { value in
					finishedWithoutValue = false
					continuation.resume(with: .success(value))
				}
		}
	}
}

I recommend that you use Publisher.values (Apple's implementation) instead and create a wrapper around it for cases where you only need (or expect) the first value:

extension Publisher {
    
    var firstValue: Output? {
        get async throws {
            for try await value in self.values {
                // wait for the first value in the `AsyncSequence`
                // and then return it
                return value
            }
            // if the sequence produces no elements, return `nil`
            // you could change this to `throw AsyncError.finishedWithoutValue`
            // this computed property is already marked as throwing
            return nil
        }
    }

}

This lets Apple take care of the work of consuming the published values and transforming the result into an AsyncThrowingStream. I trust Apple the most.

Then, apply transformations to the stream or the elements it produces, if necessary. For example:

let playlist = try await spotifyAPI
    .playlist(playlistURI)
    .firstValue

guard let playlist = playlist else {
    fatalError("unexpectedly found nil for `playlist`")
}

let playlistItems = try await spotifyAPI
    .extendPagesConcurrently(playlist.items)
    .collectAndSortByOffset()
    .firstValue

/\ Note this is also another solution to your original question.

Here's another example:

let allPlaylistItems: [PlaylistItem] = try await spotifyAPI
    .playlistItems(playlistURI)
    // get all pages
    .extendPages(spotifyAPI)
    // transform to async sequence
    .values
    // THEN apply transformations when already in an async context
    .reduce(
        into: []
    ) { (partialResult: inout [PlaylistItem], playlistItems: PlaylistItems) -> Void in
        partialResult.append(contentsOf: playlistItems.items.compactMap(\.item))
    }


for playlistItem in allPlaylistItems {
    print(playlistItem.name)
}