fix: resolve playback stop failure and Icecast stream disconnection

AudioEngine.stop() only called Thread.interrupt(), which doesn't
interrupt blocking InputStream.read() on OkHttp streams. This caused
audio to continue after stop and blocked subsequent play attempts
(old job never completed). Now closes timedStream to force the
blocking read to fail.

Removed LowLatencySocketFactory (16KB receive buffer) which triggered
Icecast slow-client disconnection on burst-on-connect. Force HTTP/1.1
to avoid HTTP/2 negotiation issues with Icecast servers.

Also fixed: awaitEngine() SharedFlow collector coroutine leak, and
added MAX_CATCHUP_FRAMES safety cap to prevent infinite frame skipping.

Made-with: Cursor
This commit is contained in:
cottongin
2026-03-11 18:14:13 -04:00
parent 8872b9de96
commit a4c0fa9a6e
3 changed files with 37 additions and 34 deletions

View File

@@ -5,6 +5,7 @@ import android.media.AudioFormat
import android.media.AudioTrack import android.media.AudioTrack
import android.media.MediaCodec import android.media.MediaCodec
import android.media.MediaFormat import android.media.MediaFormat
import android.util.Log
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharedFlow
import java.io.InputStream import java.io.InputStream
@@ -34,34 +35,43 @@ class AudioEngine(
private var currentCodec: MediaCodec? = null private var currentCodec: MediaCodec? = null
@Volatile @Volatile
private var catchingUp = true private var catchingUp = true
private var catchupFramesSkipped = 0
@Volatile @Volatile
private var timedStream: TimedInputStream? = null private var timedStream: TimedInputStream? = null
private var presentationTimeUs = 0L private var presentationTimeUs = 0L
fun start() { fun start() {
Log.i(TAG, "start() url=$url")
running = true running = true
catchingUp = true catchingUp = true
catchupFramesSkipped = 0
presentationTimeUs = 0L presentationTimeUs = 0L
thread = Thread({ thread = Thread({
try { try {
runPipeline() runPipeline()
} catch (e: Exception) { } catch (e: Exception) {
if (running) { if (running) {
Log.e(TAG, "Pipeline error (active): ${e.message}", e)
val error = when (e) { val error = when (e) {
is ConnectionFailed -> is ConnectionFailed ->
EngineError.ConnectionFailed(e) EngineError.ConnectionFailed(e)
else -> EngineError.DecoderError(e) else -> EngineError.DecoderError(e)
} }
_events.tryEmit(AudioEngineEvent.Error(error)) _events.tryEmit(AudioEngineEvent.Error(error))
} else {
Log.i(TAG, "Pipeline stopped: ${e.message}")
} }
} finally { } finally {
Log.i(TAG, "Pipeline finally — emitting Stopped")
_events.tryEmit(AudioEngineEvent.Stopped) _events.tryEmit(AudioEngineEvent.Stopped)
} }
}, "AudioEngine").apply { start() } }, "AudioEngine").apply { start() }
} }
fun stop() { fun stop() {
Log.i(TAG, "stop() called, running=$running, thread=${thread?.name}")
running = false running = false
try { timedStream?.close() } catch (_: Exception) {}
thread?.interrupt() thread?.interrupt()
thread = null thread = null
} }
@@ -71,8 +81,10 @@ class AudioEngine(
} }
private fun runPipeline() { private fun runPipeline() {
Log.i(TAG, "runPipeline() connecting to $url")
val connection = StreamConnection(url) val connection = StreamConnection(url)
connection.open() connection.open()
Log.i(TAG, "Connected: metaint=${connection.metaint}, info=${connection.streamInfo}")
val tStream = TimedInputStream(connection.inputStream!!) val tStream = TimedInputStream(connection.inputStream!!)
timedStream = tStream timedStream = tStream
@@ -152,8 +164,12 @@ class AudioEngine(
private fun decodeToPcm(codec: MediaCodec, mp3Frame: ByteArray, audioTrack: AudioTrack) { private fun decodeToPcm(codec: MediaCodec, mp3Frame: ByteArray, audioTrack: AudioTrack) {
if (catchingUp) { if (catchingUp) {
catchupFramesSkipped++
val lastReadMs = timedStream?.lastReadDurationMs ?: 0L val lastReadMs = timedStream?.lastReadDurationMs ?: 0L
if (lastReadMs >= CATCHUP_THRESHOLD_MS) { if (lastReadMs >= CATCHUP_THRESHOLD_MS || catchupFramesSkipped >= MAX_CATCHUP_FRAMES) {
if (catchupFramesSkipped >= MAX_CATCHUP_FRAMES) {
Log.w(TAG, "Catchup cap reached after $catchupFramesSkipped frames, starting playback")
}
catchingUp = false catchingUp = false
} else { } else {
return return
@@ -205,8 +221,10 @@ class AudioEngine(
} }
companion object { companion object {
private const val TAG = "AudioEngine"
private const val FRAMES_PER_SECOND = 38 private const val FRAMES_PER_SECOND = 38
private const val CATCHUP_THRESHOLD_MS = 30L private const val CATCHUP_THRESHOLD_MS = 30L
private const val MAX_CATCHUP_FRAMES = FRAMES_PER_SECOND * 5 // 5 seconds max skip
private const val FRAME_DURATION_US = 26_122L // 1152 samples at 44100 Hz private const val FRAME_DURATION_US = 26_122L // 1152 samples at 44100 Hz
} }
} }

View File

@@ -1,40 +1,12 @@
package xyz.cottongin.radio247.audio package xyz.cottongin.radio247.audio
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import okhttp3.Protocol
import okhttp3.Request import okhttp3.Request
import okhttp3.Response import okhttp3.Response
import java.io.IOException import java.io.IOException
import java.io.InputStream import java.io.InputStream
import java.net.InetAddress
import java.net.Socket
import java.time.Duration import java.time.Duration
import javax.net.SocketFactory
private const val SOCKET_RECV_BUF = 16_384
private class LowLatencySocketFactory : SocketFactory() {
private val delegate = getDefault()
override fun createSocket(): Socket =
delegate.createSocket().applyOpts()
override fun createSocket(host: String, port: Int): Socket =
delegate.createSocket(host, port).applyOpts()
override fun createSocket(host: String, port: Int, localAddr: InetAddress, localPort: Int): Socket =
delegate.createSocket(host, port, localAddr, localPort).applyOpts()
override fun createSocket(host: InetAddress, port: Int): Socket =
delegate.createSocket(host, port).applyOpts()
override fun createSocket(host: InetAddress, port: Int, localAddr: InetAddress, localPort: Int): Socket =
delegate.createSocket(host, port, localAddr, localPort).applyOpts()
private fun Socket.applyOpts(): Socket = apply {
receiveBufferSize = SOCKET_RECV_BUF
tcpNoDelay = true
}
}
class ConnectionFailed(message: String, cause: Throwable? = null) : Exception(message, cause) class ConnectionFailed(message: String, cause: Throwable? = null) : Exception(message, cause)
@@ -46,7 +18,7 @@ data class StreamInfo(
class StreamConnection(private val url: String) { class StreamConnection(private val url: String) {
private val client = OkHttpClient.Builder() private val client = OkHttpClient.Builder()
.socketFactory(LowLatencySocketFactory()) .protocols(listOf(Protocol.HTTP_1_1))
.readTimeout(Duration.ofSeconds(30)) .readTimeout(Duration.ofSeconds(30))
.build() .build()

View File

@@ -10,6 +10,7 @@ import android.net.NetworkCapabilities
import android.net.NetworkRequest import android.net.NetworkRequest
import android.os.IBinder import android.os.IBinder
import android.os.PowerManager import android.os.PowerManager
import android.util.Log
import androidx.lifecycle.LifecycleService import androidx.lifecycle.LifecycleService
import android.support.v4.media.session.MediaSessionCompat import android.support.v4.media.session.MediaSessionCompat
import xyz.cottongin.radio247.RadioApplication import xyz.cottongin.radio247.RadioApplication
@@ -43,6 +44,7 @@ class ConnectionFailedException(cause: Throwable) : Exception("Connection failed
class RadioPlaybackService : LifecycleService() { class RadioPlaybackService : LifecycleService() {
companion object { companion object {
private const val TAG = "RadioPlayback"
const val ACTION_PLAY = "xyz.cottongin.radio247.PLAY" const val ACTION_PLAY = "xyz.cottongin.radio247.PLAY"
const val ACTION_STOP = "xyz.cottongin.radio247.STOP" const val ACTION_STOP = "xyz.cottongin.radio247.STOP"
const val ACTION_PAUSE = "xyz.cottongin.radio247.PAUSE" const val ACTION_PAUSE = "xyz.cottongin.radio247.PAUSE"
@@ -95,6 +97,7 @@ class RadioPlaybackService : LifecycleService() {
private var playJob: Job? = null private var playJob: Job? = null
private fun transition(newState: PlaybackState) { private fun transition(newState: PlaybackState) {
Log.i(TAG, "transition → $newState")
controller.updateState(newState) controller.updateState(newState)
} }
@@ -125,13 +128,16 @@ class RadioPlaybackService : LifecycleService() {
val oldJob = playJob val oldJob = playJob
val currentState = controller.state.value val currentState = controller.state.value
val isResume = currentState is PlaybackState.Paused && currentState.station.id == stationId val isResume = currentState is PlaybackState.Paused && currentState.station.id == stationId
Log.i(TAG, "launchPlay stationId=$stationId isResume=$isResume hasOldJob=${oldJob != null}")
playJob = serviceScope.launch { playJob = serviceScope.launch {
oldJob?.let { oldJob?.let {
Log.i(TAG, "Stopping old engine and joining old job…")
if (!isResume) { if (!isResume) {
stayConnected = false stayConnected = false
} }
engine?.stop() engine?.stop()
it.join() it.join()
Log.i(TAG, "Old job joined")
} }
stayConnected = app.preferences.stayConnected.first() stayConnected = app.preferences.stayConnected.first()
val station = stationDao.getStationById(stationId) val station = stationDao.getStationById(stationId)
@@ -179,11 +185,13 @@ class RadioPlaybackService : LifecycleService() {
try { try {
val urls = app.streamResolver.resolveUrls(station) val urls = app.streamResolver.resolveUrls(station)
Log.i(TAG, "handlePlay resolved ${urls.size} URLs for '${station.name}': ${urls.take(3)}")
startEngine(station, urls) startEngine(station, urls)
if (stayConnected) { if (stayConnected) {
reconnectLoop(station) reconnectLoop(station)
} }
} catch (_: Exception) { } catch (e: Exception) {
Log.w(TAG, "handlePlay caught: ${e.message}", e)
if (stayConnected) { if (stayConnected) {
reconnectLoop(station) reconnectLoop(station)
} }
@@ -219,6 +227,7 @@ class RadioPlaybackService : LifecycleService() {
} }
private fun handleStop() { private fun handleStop() {
Log.i(TAG, "handleStop()")
stayConnected = false stayConnected = false
retryImmediatelyOnNetwork = false retryImmediatelyOnNetwork = false
engine?.stop() engine?.stop()
@@ -307,7 +316,7 @@ class RadioPlaybackService : LifecycleService() {
val deferred = CompletableDeferred<Unit>() val deferred = CompletableDeferred<Unit>()
val connectionStartedAt = System.currentTimeMillis() val connectionStartedAt = System.currentTimeMillis()
serviceScope.launch { val collectJob = serviceScope.launch {
engine!!.events.collect { event -> engine!!.events.collect { event ->
when (event) { when (event) {
is AudioEngineEvent.Started -> { is AudioEngineEvent.Started -> {
@@ -363,7 +372,11 @@ class RadioPlaybackService : LifecycleService() {
deferred.completeExceptionally(Exception("Event flow completed unexpectedly")) deferred.completeExceptionally(Exception("Event flow completed unexpectedly"))
} }
} }
deferred.await() try {
deferred.await()
} finally {
collectJob.cancel()
}
} }
private suspend fun reconnectLoop(station: Station) { private suspend fun reconnectLoop(station: Station) {