- Fixed race condition in WebSocketService when handling intercepted messages after a disconnect.

- Changed the way errors are handled so they are, in general, treated as non-fatals.
- Fixed a bug in the offline track interceptor that would erroneously intercept requests it shouldn't
This commit is contained in:
casey langen 2017-06-13 00:26:28 -07:00
parent 3bce961cb5
commit 6ce475b9ac
6 changed files with 92 additions and 66 deletions

View File

@ -27,8 +27,8 @@ public abstract class OfflineDb extends RoomDatabase {
final String category = message.getStringOption(Messages.Key.CATEGORY);
if (Messages.Category.OFFLINE.equals(category)) {
queryTracks(message, responder);
return true;
}
return true;
}
return false;
});
@ -73,7 +73,6 @@ public abstract class OfflineDb extends RoomDatabase {
options.put(Messages.Key.COUNT, dao.countTracks());
}
else {
final int offset = message.getIntOption(Messages.Key.OFFSET, -1);
final int limit = message.getIntOption(Messages.Key.LIMIT, -1);
@ -90,8 +89,10 @@ public abstract class OfflineDb extends RoomDatabase {
options.put(Messages.Key.DATA, tracks);
responder.respond(SocketMessage.Builder
.respondTo(message).withOptions(options).build());
final SocketMessage response = SocketMessage.Builder
.respondTo(message).withOptions(options).build();
responder.respond(response);
return true;
})

View File

@ -12,6 +12,8 @@ import android.util.Log;
import org.json.JSONArray;
import org.json.JSONObject;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.net.URLEncoder;
import java.util.ArrayList;
@ -474,7 +476,6 @@ public class StreamingPlaybackService implements PlaybackService {
return current / getMaxSystemVolume();
}
private float getMaxSystemVolume() {
return audioManager.getStreamMaxVolume(AudioManager.STREAM_MUSIC);
}
@ -745,19 +746,20 @@ public class StreamingPlaybackService implements PlaybackService {
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.map(StreamingPlaybackService::extractTrackFromMessage)
.doOnNext(track -> {
if (params == this.params && context.currentIndex == currentIndex) {
if (context.nextMetadata == null) {
context.nextIndex = nextIndex;
context.nextMetadata = track;
prefetchNextTrackAudio();
.subscribe(
(track) -> {
if (params == StreamingPlaybackService.this.params && context.currentIndex == currentIndex) {
if (context.nextMetadata == null) {
context.nextIndex = nextIndex;
context.nextMetadata = track;
prefetchNextTrackAudio();
}
}
}
})
.doOnError(error -> {
Log.e(TAG, "failed to prefetch next track!", error);
})
.subscribe();
},
(error) -> {
Log.e(TAG, "failed to prefetch next track!", error);
});
}
}
}
@ -783,31 +785,37 @@ public class StreamingPlaybackService implements PlaybackService {
.flatMap(response -> getQueueCount(context, response))
.concatMap(count -> getCurrentAndNextTrackMessages(context, count))
.map(StreamingPlaybackService::extractTrackFromMessage)
.doOnNext(track -> {
if (context.currentMetadata == null) {
context.currentMetadata = track;
}
else {
context.nextMetadata = track;
}
})
.doOnComplete(() -> {
if (this.params == params && this.context == context) {
notifyEventListeners();
final String uri = getUri(this.context.currentMetadata);
if (uri != null) {
this.context.currentPlayer = PlayerWrapper.Companion.newInstance();
this.context.currentPlayer.setOnStateChangedListener(onCurrentPlayerStateChanged);
this.context.currentPlayer.play(uri, this.context.currentMetadata);
.subscribe(
track -> {
if (context.currentMetadata == null) {
context.currentMetadata = track;
}
}
})
.doOnError(error -> {
Log.e(TAG, "failed to load track to play!", error);
setState(PlaybackState.Stopped);
})
.subscribe();
else {
context.nextMetadata = track;
}
},
error -> {
Log.e(TAG, "failed to load track to play!", error);
setState(PlaybackState.Stopped);
},
() -> {
if (this.params == params && this.context == context) {
notifyEventListeners();
final String uri = getUri(this.context.currentMetadata);
if (uri != null) {
this.context.currentPlayer = PlayerWrapper.Companion.newInstance();
this.context.currentPlayer.setOnStateChangedListener(onCurrentPlayerStateChanged);
this.context.currentPlayer.play(uri, this.context.currentMetadata);
}
}
else {
Log.d(TAG, "onComplete fired, but params/context changed. discarding!");
}
});
}
private void cancelScheduledPausedSleep() {
@ -826,18 +834,19 @@ public class StreamingPlaybackService implements PlaybackService {
this.wss.send(query, this.wssClient)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext(response -> {
if (params == this.params) {
final JSONArray data = response.getJsonArrayOption(Messages.Key.DATA);
for (int i = 0; i < data.length(); i++) {
trackMetadataCache.put(start + i, data.getJSONObject(i));
.subscribe(
response -> {
if (params == this.params) {
final JSONArray data = response.getJsonArrayOption(Messages.Key.DATA);
for (int i = 0; i < data.length(); i++) {
trackMetadataCache.put(start + i, data.getJSONObject(i));
}
}
}
})
.doOnError(error -> {
Log.e(TAG, "failed to prefetch track metadata!", error);
})
.subscribe();
},
error -> {
Log.e(TAG, "failed to prefetch track metadata!", error);
});
}
private TrackListSlidingWindow.QueryFactory queryFactory = new TrackListSlidingWindow.QueryFactory() {

View File

@ -86,6 +86,10 @@ public class PlayQueueActivity extends WebSocketActivityBase {
protected void onResume() {
this.tracks.resume(); /* needs to happen before */
super.onResume();
if (offlineQueue) {
tracks.requery();
}
}
@Override
@ -118,7 +122,7 @@ public class PlayQueueActivity extends WebSocketActivityBase {
private final WebSocketService.Client webSocketClient = new WebSocketService.Client() {
@Override
public void onStateChanged(WebSocketService.State newState, WebSocketService.State oldState) {
if (newState == WebSocketService.State.Connected || offlineQueue) {
if (newState == WebSocketService.State.Connected) {
tracks.requery();
}
}

View File

@ -118,6 +118,7 @@ public class TrackListActivity extends WebSocketActivityBase implements Filterab
protected void onResume() {
this.tracks.resume(); /* needs to happen before */
super.onResume();
requeryIfViewingOfflineCache();
}
@Override
@ -139,7 +140,7 @@ public class TrackListActivity extends WebSocketActivityBase implements Filterab
private WebSocketService.Client socketServiceClient = new WebSocketService.Client() {
@Override
public void onStateChanged(WebSocketService.State newState, WebSocketService.State oldState) {
if (canRequery()) {
if (getWebSocketService().getState() == WebSocketService.State.Connected) {
filterDebouncer.cancel();
tracks.requery();
}
@ -240,14 +241,14 @@ public class TrackListActivity extends WebSocketActivityBase implements Filterab
}
}
private static boolean isValidCategory(final String categoryType, long categoryId) {
return categoryType != null && categoryType.length() > 0 && categoryId != -1;
private void requeryIfViewingOfflineCache() {
if (Messages.Category.OFFLINE.equals(categoryType)) {
tracks.requery();
}
}
private boolean canRequery() {
return
getWebSocketService().getState() == WebSocketService.State.Connected ||
Messages.Category.OFFLINE.equals(categoryType);
private static boolean isValidCategory(final String categoryType, long categoryId) {
return categoryType != null && categoryType.length() > 0 && categoryId != -1;
}
private QueryFactory createCategoryQueryFactory(

View File

@ -49,7 +49,7 @@ public class WebSocketService {
private static final int MESSAGE_BASE = 0xcafedead;
private static final int MESSAGE_CONNECT_THREAD_FINISHED = MESSAGE_BASE + 0;
private static final int MESSAGE_MESSAGE_RECEIVED = MESSAGE_BASE + 1;
private static final int MESSAGE_RECEIVED = MESSAGE_BASE + 1;
private static final int MESSAGE_REMOVE_OLD_CALLBACKS = MESSAGE_BASE + 2;
private static final int MESSAGE_AUTO_RECONNECT = MESSAGE_BASE + 3;
private static final int MESSAGE_SCHEDULE_PING = MESSAGE_BASE + 4;
@ -108,7 +108,7 @@ public class WebSocketService {
}
return true;
}
else if (message.what == MESSAGE_MESSAGE_RECEIVED) {
else if (message.what == MESSAGE_RECEIVED) {
if (clients != null) {
final SocketMessage msg = (SocketMessage) message.obj;
@ -160,6 +160,7 @@ public class WebSocketService {
private static class MessageResultDescriptor {
long id;
long enqueueTime;
boolean intercepted;
Client client;
MessageResultCallback callback;
MessageErrorCallback error;
@ -319,12 +320,16 @@ public class WebSocketService {
mrd.enqueueTime = System.currentTimeMillis();
mrd.client = client;
mrd.callback = callback;
mrd.intercepted = intercepted;
messageCallbacks.put(message.getId(), mrd);
}
if (!intercepted) {
this.socket.sendText(message.toString());
}
else {
Log.d(TAG, "send: message intercepted with id " + String.valueOf(id));
}
return id;
}
@ -365,6 +370,7 @@ public class WebSocketService {
mrd.id = NEXT_ID.incrementAndGet();
mrd.enqueueTime = System.currentTimeMillis();
mrd.client = client;
mrd.intercepted = intercepted;
mrd.callback = (SocketMessage message) -> {
emitter.onNext(message);
@ -414,7 +420,7 @@ public class WebSocketService {
this.socket = null;
}
this.messageCallbacks.clear();
removeNonInterceptedCallbacks();
setState(State.Disconnected);
if (autoReconnect) {
@ -427,6 +433,10 @@ public class WebSocketService {
}
}
private void removeNonInterceptedCallbacks() {
removeCallbacks((mrd) -> !mrd.intercepted);
}
private void removeInternalCallbacks() {
removeCallbacks((MessageResultDescriptor mrd) -> mrd.client == INTERNAL_CLIENT);
}
@ -453,6 +463,7 @@ public class WebSocketService {
if (mdr.error != null) {
mdr.error.onMessageError();
}
it.remove();
}
}
@ -534,7 +545,7 @@ public class WebSocketService {
/* post to the back of the queue in case the interceptor responded immediately;
we need to ensure all of the request book-keeping has been finished. */
handler.post(() -> {
handler.sendMessage(Message.obtain(handler, MESSAGE_MESSAGE_RECEIVED, response));
handler.sendMessage(Message.obtain(handler, MESSAGE_RECEIVED, response));
});
};
@ -548,7 +559,7 @@ public class WebSocketService {
handler, MESSAGE_CONNECT_THREAD_FINISHED, websocket));
}
else {
handler.sendMessage(Message.obtain(handler, MESSAGE_MESSAGE_RECEIVED, message));
handler.sendMessage(Message.obtain(handler, MESSAGE_RECEIVED, message));
}
}
}

View File

@ -65,7 +65,7 @@ TranscodingDataStream::TranscodingDataStream(
/* note that we purposely under-estimate the content length by a couple
seconds. we do this because http clients seem to be more likely to be
throw a fit if we over estimate, instead of under-estimate. */
this->length = (PositionType)((this->decoder->GetDuration() - 2.0) * 1000.0 * (float)bitrate / 8.0);
this->length = (PositionType)((this->decoder->GetDuration() - 1.0) * 1000.0 * (float)bitrate / 8.0);
}
}
}