[Glitch] Add support for managing multiple stream subscriptions in a single connection

Ported ef057584fd to glitch-soc

Signed-off-by: Thibaut Girka <thib@sitedethib.com>
This commit is contained in:
Eugen Rochko 2020-08-11 18:24:59 +02:00 committed by Thibaut Girka
parent 78fa15d08f
commit 8ae52dc792
2 changed files with 311 additions and 77 deletions

View file

@ -1,3 +1,5 @@
// @ts-check
import { connectStream } from 'flavours/glitch/util/stream'; import { connectStream } from 'flavours/glitch/util/stream';
import { import {
updateTimeline, updateTimeline,
@ -19,24 +21,59 @@ import { getLocale } from 'mastodon/locales';
const { messages } = getLocale(); const { messages } = getLocale();
export function connectTimelineStream (timelineId, path, pollingRefresh = null, accept = null) { /**
* @param {number} max
* @return {number}
*/
const randomUpTo = max =>
Math.floor(Math.random() * Math.floor(max));
return connectStream (path, pollingRefresh, (dispatch, getState) => { /**
* @param {string} timelineId
* @param {string} channelName
* @param {Object.<string, string>} params
* @param {Object} options
* @param {function(Function, Function): void} [options.fallback]
* @param {function(object): boolean} [options.accept]
* @return {function(): void}
*/
export const connectTimelineStream = (timelineId, channelName, params = {}, options = {}) =>
connectStream(channelName, params, (dispatch, getState) => {
const locale = getState().getIn(['meta', 'locale']); const locale = getState().getIn(['meta', 'locale']);
let pollingId;
/**
* @param {function(Function, Function): void} fallback
*/
const useFallback = fallback => {
fallback(dispatch, () => {
pollingId = setTimeout(() => useFallback(fallback), 20000 + randomUpTo(20000));
});
};
return { return {
onConnect() { onConnect() {
dispatch(connectTimeline(timelineId)); dispatch(connectTimeline(timelineId));
if (pollingId) {
clearTimeout(pollingId);
pollingId = null;
}
}, },
onDisconnect() { onDisconnect() {
dispatch(disconnectTimeline(timelineId)); dispatch(disconnectTimeline(timelineId));
if (options.fallback) {
pollingId = setTimeout(() => useFallback(options.fallback), randomUpTo(40000));
}
}, },
onReceive (data) { onReceive (data) {
switch(data.event) { switch(data.event) {
case 'update': case 'update':
dispatch(updateTimeline(timelineId, JSON.parse(data.payload), accept)); dispatch(updateTimeline(timelineId, JSON.parse(data.payload), options.accept));
break; break;
case 'delete': case 'delete':
dispatch(deleteFromTimelines(data.payload)); dispatch(deleteFromTimelines(data.payload));
@ -63,17 +100,60 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null,
}, },
}; };
}); });
}
/**
* @param {Function} dispatch
* @param {function(): void} done
*/
const refreshHomeTimelineAndNotification = (dispatch, done) => { const refreshHomeTimelineAndNotification = (dispatch, done) => {
dispatch(expandHomeTimeline({}, () => dispatch(expandHomeTimeline({}, () =>
dispatch(expandNotifications({}, () => dispatch(expandNotifications({}, () =>
dispatch(fetchAnnouncements(done)))))); dispatch(fetchAnnouncements(done))))));
}; };
export const connectUserStream = () => connectTimelineStream('home', 'user', refreshHomeTimelineAndNotification); /**
export const connectCommunityStream = ({ onlyMedia } = {}) => connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`); * @return {function(): void}
export const connectPublicStream = ({ onlyMedia, onlyRemote, allowLocalOnly } = {}) => connectTimelineStream(`public${onlyRemote ? ':remote' : (allowLocalOnly ? ':allow_local_only' : '')}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : (allowLocalOnly ? ':allow_local_only' : '')}${onlyMedia ? ':media' : ''}`); */
export const connectHashtagStream = (id, tag, local, accept) => connectTimelineStream(`hashtag:${id}${local ? ':local' : ''}`, `hashtag${local ? ':local' : ''}&tag=${tag}`, null, accept); export const connectUserStream = () =>
export const connectDirectStream = () => connectTimelineStream('direct', 'direct'); connectTimelineStream('home', 'user', {}, { fallback: refreshHomeTimelineAndNotification });
export const connectListStream = id => connectTimelineStream(`list:${id}`, `list&list=${id}`);
/**
* @param {Object} options
* @param {boolean} [options.onlyMedia]
* @return {function(): void}
*/
export const connectCommunityStream = ({ onlyMedia } = {}) =>
connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`);
/**
* @param {Object} options
* @param {boolean} [options.onlyMedia]
* @param {boolean} [options.onlyRemote]
* @param {boolean} [options.allowLocalOnly]
* @return {function(): void}
*/
export const connectPublicStream = ({ onlyMedia, onlyRemote, allowLocalOnly } = {}) =>
connectTimelineStream(`public${onlyRemote ? ':remote' : (allowLocalOnly ? ':allow_local_only' : '')}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : (allowLocalOnly ? ':allow_local_only' : '')}${onlyMedia ? ':media' : ''}`);
/**
* @param {string} columnId
* @param {string} tagName
* @param {boolean} onlyLocal
* @param {function(object): boolean} accept
* @return {function(): void}
*/
export const connectHashtagStream = (columnId, tagName, onlyLocal, accept) =>
connectTimelineStream(`hashtag:${columnId}${onlyLocal ? ':local' : ''}`, `hashtag${onlyLocal ? ':local' : ''}`, { tag: tagName }, { accept });
/**
* @return {function(): void}
*/
export const connectDirectStream = () =>
connectTimelineStream('direct', 'direct');
/**
* @param {string} listId
* @return {function(): void}
*/
export const connectListStream = listId =>
connectTimelineStream(`list:${listId}`, 'list', { list: listId });

View file

@ -1,87 +1,236 @@
// @ts-check
import WebSocketClient from '@gamestdio/websocket'; import WebSocketClient from '@gamestdio/websocket';
const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max)); /**
* @type {WebSocketClient | undefined}
*/
let sharedConnection;
const knownEventTypes = [ /**
'update', * @typedef Subscription
'delete', * @property {string} channelName
'notification', * @property {Object.<string, string>} params
'conversation', * @property {function(): void} onConnect
'filters_changed', * @property {function(StreamEvent): void} onReceive
]; * @property {function(): void} onDisconnect
*/
export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { /**
return (dispatch, getState) => { * @typedef StreamEvent
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); * @property {string} event
const accessToken = getState().getIn(['meta', 'access_token']); * @property {object} payload
const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); */
let polling = null; /**
* @type {Array.<Subscription>}
*/
const subscriptions = [];
const setupPolling = () => { /**
pollingRefresh(dispatch, () => { * @type {Object.<string, number>}
polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000)); */
}); const subscriptionCounters = {};
};
const clearPolling = () => { /**
if (polling) { * @param {Subscription} subscription
clearTimeout(polling); */
polling = null; const addSubscription = subscription => {
subscriptions.push(subscription);
};
/**
* @param {Subscription} subscription
*/
const removeSubscription = subscription => {
const index = subscriptions.indexOf(subscription);
if (index !== -1) {
subscriptions.splice(index, 1);
} }
}; };
const subscription = getStream(streamingAPIBaseURL, accessToken, path, { /**
connected () { * @param {Subscription} subscription
if (pollingRefresh) { */
clearPolling(); const subscribe = ({ channelName, params, onConnect }) => {
const key = channelNameWithInlineParams(channelName, params);
subscriptionCounters[key] = subscriptionCounters[key] || 0;
if (subscriptionCounters[key] === 0) {
sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params }));
} }
subscriptionCounters[key] += 1;
onConnect(); onConnect();
};
/**
* @param {Subscription} subscription
*/
const unsubscribe = ({ channelName, params, onDisconnect }) => {
const key = channelNameWithInlineParams(channelName, params);
subscriptionCounters[key] = subscriptionCounters[key] || 1;
if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) {
sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params }));
}
subscriptionCounters[key] -= 1;
onDisconnect();
};
const sharedCallbacks = {
connected () {
subscriptions.forEach(subscription => subscribe(subscription));
},
received (data) {
const { stream } = data;
subscriptions.filter(({ channelName, params }) => {
const streamChannelName = stream[0];
if (stream.length === 1) {
return channelName === streamChannelName;
}
const streamIdentifier = stream[1];
if (['hashtag', 'hashtag:local'].includes(channelName)) {
return channelName === streamChannelName && params.tag === streamIdentifier;
} else if (channelName === 'list') {
return channelName === streamChannelName && params.list === streamIdentifier;
}
return false;
}).forEach(subscription => {
subscription.onReceive(data);
});
}, },
disconnected () { disconnected () {
if (pollingRefresh) { subscriptions.forEach(({ onDisconnect }) => onDisconnect());
polling = setTimeout(() => setupPolling(), randomIntUpTo(40000)); },
reconnected () {
subscriptions.forEach(subscription => subscribe(subscription));
},
};
/**
* @param {string} channelName
* @param {Object.<string, string>} params
* @return {string}
*/
const channelNameWithInlineParams = (channelName, params) => {
if (Object.keys(params).length === 0) {
return channelName;
} }
onDisconnect(); return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join('&')}`;
};
/**
* @param {string} channelName
* @param {Object.<string, string>} params
* @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks
* @return {function(): void}
*/
export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => {
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
const accessToken = getState().getIn(['meta', 'access_token']);
const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState);
// If we cannot use a websockets connection, we must fall back
// to using individual connections for each channel
if (!streamingAPIBaseURL.startsWith('ws')) {
const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), {
connected () {
onConnect();
}, },
received (data) { received (data) {
onReceive(data); onReceive(data);
}, },
reconnected () { disconnected () {
if (pollingRefresh) { onDisconnect();
clearPolling();
pollingRefresh(dispatch);
}
onConnect();
}, },
reconnected () {
onConnect();
},
}); });
const disconnect = () => { return () => {
if (subscription) { connection.close();
subscription.close(); };
} }
clearPolling(); const subscription = {
channelName,
params,
onConnect,
onReceive,
onDisconnect,
}; };
return disconnect; addSubscription(subscription);
// If a connection is open, we can execute the subscription right now. Otherwise,
// because we have already registered it, it will be executed on connect
if (!sharedConnection) {
sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, '', sharedCallbacks));
} else if (sharedConnection.readyState === WebSocketClient.OPEN) {
subscribe(subscription);
}
return () => {
removeSubscription(subscription);
unsubscribe(subscription);
}; };
} };
const KNOWN_EVENT_TYPES = [
'update',
'delete',
'notification',
'conversation',
'filters_changed',
'encrypted_message',
'announcement',
'announcement.delete',
'announcement.reaction',
];
export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { /**
const params = stream.split('&'); * @param {MessageEvent} e
stream = params.shift(); * @param {function(StreamEvent): void} received
*/
const handleEventSourceMessage = (e, received) => {
received({
event: e.type,
payload: e.data,
});
};
/**
* @param {string} streamingAPIBaseURL
* @param {string} accessToken
* @param {string} channelName
* @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks
* @return {WebSocketClient | EventSource}
*/
const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => {
const params = channelName.split('&');
channelName = params.shift();
if (streamingAPIBaseURL.startsWith('ws')) { if (streamingAPIBaseURL.startsWith('ws')) {
params.unshift(`stream=${stream}`);
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
ws.onopen = connected; ws.onopen = connected;
@ -92,11 +241,19 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co
return ws; return ws;
} }
stream = stream.replace(/:/g, '/'); channelName = channelName.replace(/:/g, '/');
if (channelName.endsWith(':media')) {
channelName = channelName.replace('/media', '');
params.push('only_media=true');
}
params.push(`access_token=${accessToken}`); params.push(`access_token=${accessToken}`);
const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`);
const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join('&')}`);
let firstConnect = true; let firstConnect = true;
es.onopen = () => { es.onopen = () => {
if (firstConnect) { if (firstConnect) {
firstConnect = false; firstConnect = false;
@ -105,15 +262,12 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co
reconnected(); reconnected();
} }
}; };
for (let type of knownEventTypes) {
es.addEventListener(type, (e) => { KNOWN_EVENT_TYPES.forEach(type => {
received({ es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */ (e), received));
event: e.type,
payload: e.data,
}); });
});
} es.onerror = /** @type {function(): void} */ (disconnected);
es.onerror = disconnected;
return es; return es;
}; };