fix: star visibility, station switching, skip-ahead, and latency optimizations

- Star icons now use distinct tint colors (primary vs faded) for clear state
- Station switching no longer races — old playback job is cancelled before new
- Skip-ahead drops ~1s of buffered audio per tap via atomic counter
- Custom SocketFactory with SO_RCVBUF=16KB and TCP_NODELAY for minimal TCP buffering
- Catch-up drain: discards pre-buffered frames until network reads block (live edge)
- AudioTrack PERFORMANCE_MODE_LOW_LATENCY for smallest hardware buffer

Made-with: Cursor
This commit is contained in:
cottongin
2026-03-10 04:41:40 -04:00
parent 6144de6b08
commit 49bbb54bb9
10 changed files with 376 additions and 72 deletions

View File

@@ -7,6 +7,8 @@ import android.media.MediaCodec
import android.media.MediaFormat
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
class AudioEngine(
@@ -19,12 +21,25 @@ class AudioEngine(
private var thread: Thread? = null
@Volatile
private var running = false
private val _estimatedLatencyMs = AtomicLong(0)
private val pendingSkips = AtomicInteger(0)
private val _estimatedLatencyMs = AtomicLong(0)
val estimatedLatencyMs: Long get() = _estimatedLatencyMs.get()
@Volatile
private var currentRingBuffer: RingBuffer? = null
@Volatile
private var currentAudioTrack: AudioTrack? = null
@Volatile
private var currentCodec: MediaCodec? = null
@Volatile
private var catchingUp = true
@Volatile
private var timedStream: TimedInputStream? = null
fun start() {
running = true
catchingUp = true
thread = Thread({
try {
runPipeline()
@@ -49,10 +64,17 @@ class AudioEngine(
thread = null
}
fun skipAhead() {
pendingSkips.incrementAndGet()
}
private fun runPipeline() {
val connection = StreamConnection(url)
connection.open()
val tStream = TimedInputStream(connection.inputStream!!)
timedStream = tStream
val sampleRate = 44100
val channelConfig = AudioFormat.CHANNEL_OUT_STEREO
val encoding = AudioFormat.ENCODING_PCM_16BIT
@@ -73,6 +95,7 @@ class AudioEngine(
.build()
)
.setBufferSizeInBytes(minBuf)
.setPerformanceMode(AudioTrack.PERFORMANCE_MODE_LOW_LATENCY)
.setTransferMode(AudioTrack.MODE_STREAM)
.build()
@@ -82,6 +105,9 @@ class AudioEngine(
codec.start()
audioTrack.play()
currentAudioTrack = audioTrack
currentCodec = codec
_events.tryEmit(AudioEngineEvent.Started)
try {
@@ -90,13 +116,14 @@ class AudioEngine(
val ringBuffer = RingBuffer(bufferFrames) { mp3Frame ->
decodeToPcm(codec, mp3Frame, audioTrack)
}
currentRingBuffer = ringBuffer
val frameSync = Mp3FrameSync { mp3Frame ->
ringBuffer.write(mp3Frame)
}
val icyParser = IcyParser(
input = connection.inputStream!!,
input = tStream,
metaint = connection.metaint,
onAudioData = { buf, off, len -> frameSync.feed(buf, off, len) },
onMetadata = { _events.tryEmit(AudioEngineEvent.MetadataChanged(it)) }
@@ -104,11 +131,14 @@ class AudioEngine(
icyParser.readAll()
// Stream ended normally
ringBuffer.flush()
frameSync.flush()
_events.tryEmit(AudioEngineEvent.Error(EngineError.StreamEnded))
} finally {
timedStream = null
currentRingBuffer = null
currentCodec = null
currentAudioTrack = null
codec.stop()
codec.release()
audioTrack.stop()
@@ -118,6 +148,26 @@ class AudioEngine(
}
private fun decodeToPcm(codec: MediaCodec, mp3Frame: ByteArray, audioTrack: AudioTrack) {
if (catchingUp) {
val lastReadMs = timedStream?.lastReadDurationMs ?: 0L
if (lastReadMs >= CATCHUP_THRESHOLD_MS) {
catchingUp = false
} else {
return
}
}
val skips = pendingSkips.getAndSet(0)
if (skips > 0) {
val framesToDrop = skips * FRAMES_PER_SECOND
currentRingBuffer?.drop(framesToDrop)
audioTrack.pause()
audioTrack.flush()
audioTrack.play()
drainCodecOutput(codec)
return
}
val inIdx = codec.dequeueInputBuffer(1000)
if (inIdx >= 0) {
val inBuf = codec.getInputBuffer(inIdx)!!
@@ -139,4 +189,41 @@ class AudioEngine(
outIdx = codec.dequeueOutputBuffer(bufferInfo, 0)
}
}
private fun drainCodecOutput(codec: MediaCodec) {
val bufferInfo = MediaCodec.BufferInfo()
var outIdx = codec.dequeueOutputBuffer(bufferInfo, 0)
while (outIdx >= 0) {
codec.releaseOutputBuffer(outIdx, false)
outIdx = codec.dequeueOutputBuffer(bufferInfo, 0)
}
}
companion object {
private const val FRAMES_PER_SECOND = 38
private const val CATCHUP_THRESHOLD_MS = 30L
}
}
private class TimedInputStream(private val delegate: InputStream) : InputStream() {
@Volatile
var lastReadDurationMs: Long = 0L
private set
override fun read(): Int {
val start = System.nanoTime()
val result = delegate.read()
lastReadDurationMs = (System.nanoTime() - start) / 1_000_000
return result
}
override fun read(b: ByteArray, off: Int, len: Int): Int {
val start = System.nanoTime()
val result = delegate.read(b, off, len)
lastReadDurationMs = (System.nanoTime() - start) / 1_000_000
return result
}
override fun available(): Int = delegate.available()
override fun close() = delegate.close()
}

View File

@@ -22,4 +22,16 @@ class RingBuffer(
onFrame(buffer.removeFirst())
}
}
fun clear() {
buffer.clear()
}
fun drop(maxCount: Int): Int {
val toDrop = minOf(maxCount, buffer.size)
repeat(toDrop) { buffer.removeFirst() }
return toDrop
}
val size: Int get() = buffer.size
}

View File

@@ -5,12 +5,42 @@ import okhttp3.Request
import okhttp3.Response
import java.io.IOException
import java.io.InputStream
import java.net.InetAddress
import java.net.Socket
import java.time.Duration
import javax.net.SocketFactory
private const val SOCKET_RECV_BUF = 16_384
private class LowLatencySocketFactory : SocketFactory() {
private val delegate = getDefault()
override fun createSocket(): Socket =
delegate.createSocket().applyOpts()
override fun createSocket(host: String, port: Int): Socket =
delegate.createSocket(host, port).applyOpts()
override fun createSocket(host: String, port: Int, localAddr: InetAddress, localPort: Int): Socket =
delegate.createSocket(host, port, localAddr, localPort).applyOpts()
override fun createSocket(host: InetAddress, port: Int): Socket =
delegate.createSocket(host, port).applyOpts()
override fun createSocket(host: InetAddress, port: Int, localAddr: InetAddress, localPort: Int): Socket =
delegate.createSocket(host, port, localAddr, localPort).applyOpts()
private fun Socket.applyOpts(): Socket = apply {
receiveBufferSize = SOCKET_RECV_BUF
tcpNoDelay = true
}
}
class ConnectionFailed(message: String, cause: Throwable? = null) : Exception(message, cause)
class StreamConnection(private val url: String) {
private val client = OkHttpClient.Builder()
.socketFactory(LowLatencySocketFactory())
.readTimeout(Duration.ofSeconds(30))
.build()

View File

@@ -31,6 +31,13 @@ class RadioController(
application.startService(intent)
}
fun seekToLive() {
val intent = Intent(application, RadioPlaybackService::class.java).apply {
action = RadioPlaybackService.ACTION_SEEK_LIVE
}
application.startService(intent)
}
// Called by the service to update state
internal fun updateState(state: PlaybackState) {
_state.value = state

View File

@@ -34,12 +34,15 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Job
class RadioPlaybackService : LifecycleService() {
companion object {
const val ACTION_PLAY = "xyz.cottongin.radio247.PLAY"
const val ACTION_STOP = "xyz.cottongin.radio247.STOP"
const val ACTION_SEEK_LIVE = "xyz.cottongin.radio247.SEEK_LIVE"
const val EXTRA_STATION_ID = "station_id"
}
@@ -85,6 +88,8 @@ class RadioPlaybackService : LifecycleService() {
@Volatile
private var retryImmediatelyOnNetwork = false
private var playJob: Job? = null
override fun onCreate() {
super.onCreate()
notificationHelper.createChannel()
@@ -95,24 +100,36 @@ class RadioPlaybackService : LifecycleService() {
ACTION_PLAY -> {
val stationId = intent.getLongExtra(EXTRA_STATION_ID, -1L)
if (stationId >= 0) {
serviceScope.launch {
val station = stationDao.getStationById(stationId)
if (station != null) {
handlePlay(station)
} else {
stopSelf()
}
}
launchPlay(stationId)
} else {
stopSelf()
}
}
ACTION_SEEK_LIVE -> handleSeekLive()
ACTION_STOP -> handleStop()
else -> stopSelf()
}
return START_NOT_STICKY
}
private fun launchPlay(stationId: Long) {
val oldJob = playJob
playJob = serviceScope.launch {
oldJob?.let {
stayConnected = false
engine?.stop()
it.join()
}
stayConnected = app.preferences.stayConnected.first()
val station = stationDao.getStationById(stationId)
if (station != null) {
handlePlay(station)
} else {
stopSelf()
}
}
}
override fun onBind(intent: Intent): IBinder? = null
override fun onDestroy() {
@@ -133,7 +150,6 @@ class RadioPlaybackService : LifecycleService() {
}
private suspend fun handlePlay(station: Station) {
stayConnected = app.preferences.stayConnected.first()
sessionStartedAt = System.currentTimeMillis()
listeningSessionId = listeningSessionDao.insert(
@@ -159,12 +175,19 @@ class RadioPlaybackService : LifecycleService() {
} finally {
endConnectionSpan()
endListeningSession()
cleanup()
stopForeground(STOP_FOREGROUND_REMOVE)
stopSelf()
val isActiveJob = playJob == coroutineContext[Job]
if (isActiveJob) {
cleanup()
stopForeground(STOP_FOREGROUND_REMOVE)
stopSelf()
}
}
}
private fun handleSeekLive() {
engine?.skipAhead()
}
private fun handleStop() {
stayConnected = false
retryImmediatelyOnNetwork = false
@@ -237,45 +260,44 @@ class RadioPlaybackService : LifecycleService() {
engine!!.start()
serviceScope.launch collector@ {
try {
engine!!.events.collect { event ->
when (event) {
is AudioEngineEvent.MetadataChanged -> {
currentMetadata = event.metadata
val playingState = controller.state.value
if (playingState is PlaybackState.Playing) {
controller.updateState(
playingState.copy(metadata = event.metadata)
)
}
updateNotification(station, event.metadata, false)
persistMetadataSnapshot(station.id, event.metadata)
val collectorJob = serviceScope.launch collector@ {
engine!!.events.collect { event ->
when (event) {
is AudioEngineEvent.MetadataChanged -> {
currentMetadata = event.metadata
val playingState = controller.state.value
if (playingState is PlaybackState.Playing) {
controller.updateState(
playingState.copy(metadata = event.metadata)
)
}
is AudioEngineEvent.Started -> {
controller.updateLatency(engine!!.estimatedLatencyMs)
}
is AudioEngineEvent.Error -> {
endConnectionSpan()
engine?.stop()
engine = null
val throwable = when (val cause = event.cause) {
is EngineError.ConnectionFailed -> cause.cause
is EngineError.StreamEnded -> Exception("Stream ended")
is EngineError.DecoderError -> cause.cause
is EngineError.AudioOutputError -> cause.cause
}
deferred.completeExceptionally(throwable)
}
is AudioEngineEvent.Stopped -> {
deferred.complete(Unit)
updateNotification(station, event.metadata, false)
persistMetadataSnapshot(station.id, event.metadata)
}
is AudioEngineEvent.Started -> {
controller.updateLatency(engine!!.estimatedLatencyMs)
}
is AudioEngineEvent.Error -> {
endConnectionSpan()
engine?.stop()
engine = null
val throwable = when (val cause = event.cause) {
is EngineError.ConnectionFailed -> cause.cause
is EngineError.StreamEnded -> Exception("Stream ended")
is EngineError.DecoderError -> cause.cause
is EngineError.AudioOutputError -> cause.cause
}
deferred.completeExceptionally(throwable)
return@collect
}
is AudioEngineEvent.Stopped -> {
deferred.complete(Unit)
return@collect
}
}
} catch (e: Exception) {
if (!deferred.isCompleted) {
deferred.completeExceptionally(e)
}
}
if (!deferred.isCompleted) {
deferred.completeExceptionally(Exception("Event flow completed unexpectedly"))
}
}
}

View File

@@ -12,15 +12,20 @@ import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.height
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.size
import androidx.compose.foundation.layout.width
import androidx.compose.foundation.rememberScrollState
import androidx.compose.foundation.verticalScroll
import androidx.compose.material.icons.Icons
import androidx.compose.material.icons.automirrored.filled.ArrowBack
import androidx.compose.material.icons.filled.FastForward
import androidx.compose.material.icons.filled.Stop
import androidx.compose.material3.Button
import androidx.compose.material3.ButtonDefaults
import androidx.compose.material3.Card
import androidx.compose.material3.CardDefaults
import androidx.compose.material3.CircularProgressIndicator
import androidx.compose.material3.ExperimentalMaterial3Api
import androidx.compose.material3.FilledTonalButton
import androidx.compose.material3.Icon
import androidx.compose.material3.IconButton
import androidx.compose.material3.MaterialTheme
@@ -162,6 +167,41 @@ fun NowPlayingScreen(
Spacer(modifier = Modifier.height(24.dp))
Row(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.Center,
verticalAlignment = Alignment.CenterVertically
) {
IconButton(
onClick = { viewModel.skipAhead() },
modifier = Modifier.size(64.dp),
enabled = state is PlaybackState.Playing
) {
Icon(
Icons.Filled.FastForward,
contentDescription = "Skip ahead ~1s",
modifier = Modifier.size(40.dp),
tint = if (state is PlaybackState.Playing)
MaterialTheme.colorScheme.primary
else MaterialTheme.colorScheme.onSurfaceVariant.copy(alpha = 0.4f)
)
}
Spacer(modifier = Modifier.width(24.dp))
IconButton(
onClick = { viewModel.stop() },
modifier = Modifier.size(64.dp)
) {
Icon(
Icons.Filled.Stop,
contentDescription = "Stop",
modifier = Modifier.size(40.dp),
tint = MaterialTheme.colorScheme.error
)
}
}
Spacer(modifier = Modifier.height(24.dp))
Row(
modifier = Modifier.fillMaxWidth(),
verticalAlignment = Alignment.CenterVertically,
@@ -192,17 +232,6 @@ fun NowPlayingScreen(
)
Spacer(modifier = Modifier.height(32.dp))
Button(
onClick = { viewModel.stop() },
modifier = Modifier
.fillMaxWidth()
.height(56.dp)
) {
Text("STOP")
}
Spacer(modifier = Modifier.height(32.dp))
}
}

View File

@@ -99,6 +99,10 @@ class NowPlayingViewModel(application: Application) : AndroidViewModel(applicati
controller.stop()
}
fun skipAhead() {
controller.seekToLive()
}
fun toggleStayConnected() {
viewModelScope.launch {
val current = stayConnected.value

View File

@@ -266,15 +266,17 @@ private fun PlaylistSectionHeader(
style = MaterialTheme.typography.titleSmall,
modifier = Modifier.weight(1f)
)
IconButton(
onClick = { onToggleStar() },
modifier = Modifier.size(32.dp)
) {
Icon(
imageVector = if (playlist.starred) Icons.Default.Star else Icons.Outlined.Star,
contentDescription = if (playlist.starred) "Unstar" else "Star"
)
}
IconButton(
onClick = { onToggleStar() },
modifier = Modifier.size(32.dp)
) {
Icon(
imageVector = if (playlist.starred) Icons.Filled.Star else Icons.Outlined.Star,
contentDescription = if (playlist.starred) "Unstar" else "Star",
tint = if (playlist.starred) MaterialTheme.colorScheme.primary
else MaterialTheme.colorScheme.onSurfaceVariant.copy(alpha = 0.4f)
)
}
}
}
@@ -306,8 +308,10 @@ private fun StationRow(
modifier = Modifier.size(36.dp)
) {
Icon(
imageVector = if (station.starred) Icons.Default.Star else Icons.Outlined.Star,
contentDescription = if (station.starred) "Unstar" else "Star"
imageVector = if (station.starred) Icons.Filled.Star else Icons.Outlined.Star,
contentDescription = if (station.starred) "Unstar" else "Star",
tint = if (station.starred) MaterialTheme.colorScheme.primary
else MaterialTheme.colorScheme.onSurfaceVariant.copy(alpha = 0.4f)
)
}
Spacer(modifier = Modifier.width(8.dp))