From a4c0fa9a6e20a6879477dc7393564e4f19e5b5de Mon Sep 17 00:00:00 2001 From: cottongin Date: Wed, 11 Mar 2026 18:14:13 -0400 Subject: [PATCH] fix: resolve playback stop failure and Icecast stream disconnection AudioEngine.stop() only called Thread.interrupt(), which doesn't interrupt blocking InputStream.read() on OkHttp streams. This caused audio to continue after stop and blocked subsequent play attempts (old job never completed). Now closes timedStream to force the blocking read to fail. Removed LowLatencySocketFactory (16KB receive buffer) which triggered Icecast slow-client disconnection on burst-on-connect. Force HTTP/1.1 to avoid HTTP/2 negotiation issues with Icecast servers. Also fixed: awaitEngine() SharedFlow collector coroutine leak, and added MAX_CATCHUP_FRAMES safety cap to prevent infinite frame skipping. Made-with: Cursor --- .../cottongin/radio247/audio/AudioEngine.kt | 20 +++++++++++- .../radio247/audio/StreamConnection.kt | 32 ++----------------- .../radio247/service/RadioPlaybackService.kt | 19 +++++++++-- 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/app/src/main/java/xyz/cottongin/radio247/audio/AudioEngine.kt b/app/src/main/java/xyz/cottongin/radio247/audio/AudioEngine.kt index 1e1d46c..476ccce 100644 --- a/app/src/main/java/xyz/cottongin/radio247/audio/AudioEngine.kt +++ b/app/src/main/java/xyz/cottongin/radio247/audio/AudioEngine.kt @@ -5,6 +5,7 @@ import android.media.AudioFormat import android.media.AudioTrack import android.media.MediaCodec import android.media.MediaFormat +import android.util.Log import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import java.io.InputStream @@ -34,34 +35,43 @@ class AudioEngine( private var currentCodec: MediaCodec? = null @Volatile private var catchingUp = true + private var catchupFramesSkipped = 0 @Volatile private var timedStream: TimedInputStream? = null private var presentationTimeUs = 0L fun start() { + Log.i(TAG, "start() url=$url") running = true catchingUp = true + catchupFramesSkipped = 0 presentationTimeUs = 0L thread = Thread({ try { runPipeline() } catch (e: Exception) { if (running) { + Log.e(TAG, "Pipeline error (active): ${e.message}", e) val error = when (e) { is ConnectionFailed -> EngineError.ConnectionFailed(e) else -> EngineError.DecoderError(e) } _events.tryEmit(AudioEngineEvent.Error(error)) + } else { + Log.i(TAG, "Pipeline stopped: ${e.message}") } } finally { + Log.i(TAG, "Pipeline finally — emitting Stopped") _events.tryEmit(AudioEngineEvent.Stopped) } }, "AudioEngine").apply { start() } } fun stop() { + Log.i(TAG, "stop() called, running=$running, thread=${thread?.name}") running = false + try { timedStream?.close() } catch (_: Exception) {} thread?.interrupt() thread = null } @@ -71,8 +81,10 @@ class AudioEngine( } private fun runPipeline() { + Log.i(TAG, "runPipeline() connecting to $url") val connection = StreamConnection(url) connection.open() + Log.i(TAG, "Connected: metaint=${connection.metaint}, info=${connection.streamInfo}") val tStream = TimedInputStream(connection.inputStream!!) timedStream = tStream @@ -152,8 +164,12 @@ class AudioEngine( private fun decodeToPcm(codec: MediaCodec, mp3Frame: ByteArray, audioTrack: AudioTrack) { if (catchingUp) { + catchupFramesSkipped++ val lastReadMs = timedStream?.lastReadDurationMs ?: 0L - if (lastReadMs >= CATCHUP_THRESHOLD_MS) { + if (lastReadMs >= CATCHUP_THRESHOLD_MS || catchupFramesSkipped >= MAX_CATCHUP_FRAMES) { + if (catchupFramesSkipped >= MAX_CATCHUP_FRAMES) { + Log.w(TAG, "Catchup cap reached after $catchupFramesSkipped frames, starting playback") + } catchingUp = false } else { return @@ -205,8 +221,10 @@ class AudioEngine( } companion object { + private const val TAG = "AudioEngine" private const val FRAMES_PER_SECOND = 38 private const val CATCHUP_THRESHOLD_MS = 30L + private const val MAX_CATCHUP_FRAMES = FRAMES_PER_SECOND * 5 // 5 seconds max skip private const val FRAME_DURATION_US = 26_122L // 1152 samples at 44100 Hz } } diff --git a/app/src/main/java/xyz/cottongin/radio247/audio/StreamConnection.kt b/app/src/main/java/xyz/cottongin/radio247/audio/StreamConnection.kt index 1eb9354..c78f141 100644 --- a/app/src/main/java/xyz/cottongin/radio247/audio/StreamConnection.kt +++ b/app/src/main/java/xyz/cottongin/radio247/audio/StreamConnection.kt @@ -1,40 +1,12 @@ package xyz.cottongin.radio247.audio import okhttp3.OkHttpClient +import okhttp3.Protocol import okhttp3.Request import okhttp3.Response import java.io.IOException import java.io.InputStream -import java.net.InetAddress -import java.net.Socket import java.time.Duration -import javax.net.SocketFactory - -private const val SOCKET_RECV_BUF = 16_384 - -private class LowLatencySocketFactory : SocketFactory() { - private val delegate = getDefault() - - override fun createSocket(): Socket = - delegate.createSocket().applyOpts() - - override fun createSocket(host: String, port: Int): Socket = - delegate.createSocket(host, port).applyOpts() - - override fun createSocket(host: String, port: Int, localAddr: InetAddress, localPort: Int): Socket = - delegate.createSocket(host, port, localAddr, localPort).applyOpts() - - override fun createSocket(host: InetAddress, port: Int): Socket = - delegate.createSocket(host, port).applyOpts() - - override fun createSocket(host: InetAddress, port: Int, localAddr: InetAddress, localPort: Int): Socket = - delegate.createSocket(host, port, localAddr, localPort).applyOpts() - - private fun Socket.applyOpts(): Socket = apply { - receiveBufferSize = SOCKET_RECV_BUF - tcpNoDelay = true - } -} class ConnectionFailed(message: String, cause: Throwable? = null) : Exception(message, cause) @@ -46,7 +18,7 @@ data class StreamInfo( class StreamConnection(private val url: String) { private val client = OkHttpClient.Builder() - .socketFactory(LowLatencySocketFactory()) + .protocols(listOf(Protocol.HTTP_1_1)) .readTimeout(Duration.ofSeconds(30)) .build() diff --git a/app/src/main/java/xyz/cottongin/radio247/service/RadioPlaybackService.kt b/app/src/main/java/xyz/cottongin/radio247/service/RadioPlaybackService.kt index 460a51e..27c98fc 100644 --- a/app/src/main/java/xyz/cottongin/radio247/service/RadioPlaybackService.kt +++ b/app/src/main/java/xyz/cottongin/radio247/service/RadioPlaybackService.kt @@ -10,6 +10,7 @@ import android.net.NetworkCapabilities import android.net.NetworkRequest import android.os.IBinder import android.os.PowerManager +import android.util.Log import androidx.lifecycle.LifecycleService import android.support.v4.media.session.MediaSessionCompat import xyz.cottongin.radio247.RadioApplication @@ -43,6 +44,7 @@ class ConnectionFailedException(cause: Throwable) : Exception("Connection failed class RadioPlaybackService : LifecycleService() { companion object { + private const val TAG = "RadioPlayback" const val ACTION_PLAY = "xyz.cottongin.radio247.PLAY" const val ACTION_STOP = "xyz.cottongin.radio247.STOP" const val ACTION_PAUSE = "xyz.cottongin.radio247.PAUSE" @@ -95,6 +97,7 @@ class RadioPlaybackService : LifecycleService() { private var playJob: Job? = null private fun transition(newState: PlaybackState) { + Log.i(TAG, "transition → $newState") controller.updateState(newState) } @@ -125,13 +128,16 @@ class RadioPlaybackService : LifecycleService() { val oldJob = playJob val currentState = controller.state.value val isResume = currentState is PlaybackState.Paused && currentState.station.id == stationId + Log.i(TAG, "launchPlay stationId=$stationId isResume=$isResume hasOldJob=${oldJob != null}") playJob = serviceScope.launch { oldJob?.let { + Log.i(TAG, "Stopping old engine and joining old job…") if (!isResume) { stayConnected = false } engine?.stop() it.join() + Log.i(TAG, "Old job joined") } stayConnected = app.preferences.stayConnected.first() val station = stationDao.getStationById(stationId) @@ -179,11 +185,13 @@ class RadioPlaybackService : LifecycleService() { try { val urls = app.streamResolver.resolveUrls(station) + Log.i(TAG, "handlePlay resolved ${urls.size} URLs for '${station.name}': ${urls.take(3)}") startEngine(station, urls) if (stayConnected) { reconnectLoop(station) } - } catch (_: Exception) { + } catch (e: Exception) { + Log.w(TAG, "handlePlay caught: ${e.message}", e) if (stayConnected) { reconnectLoop(station) } @@ -219,6 +227,7 @@ class RadioPlaybackService : LifecycleService() { } private fun handleStop() { + Log.i(TAG, "handleStop()") stayConnected = false retryImmediatelyOnNetwork = false engine?.stop() @@ -307,7 +316,7 @@ class RadioPlaybackService : LifecycleService() { val deferred = CompletableDeferred() val connectionStartedAt = System.currentTimeMillis() - serviceScope.launch { + val collectJob = serviceScope.launch { engine!!.events.collect { event -> when (event) { is AudioEngineEvent.Started -> { @@ -363,7 +372,11 @@ class RadioPlaybackService : LifecycleService() { deferred.completeExceptionally(Exception("Event flow completed unexpectedly")) } } - deferred.await() + try { + deferred.await() + } finally { + collectJob.cancel() + } } private suspend fun reconnectLoop(station: Station) {