const CITIES = ['Atlanta', 'Berlin', 'Dublin', 'Boston', 'Denver', 'Tokyo', 'Singapore'];
const REGEX_SSE_LINE = /^([^:]+?)\s*:\s*(.*?)\s*$/;
async function handler(event) {
const clientReq = event.request;
const backendResponse = await fetch(clientReq, { backend: "origin_0" });
const filteredStream = streamFilter(backendResponse.body, {
delimiter: '\n\n',
parser: parseSSE,
validator: isValidEvent
});
return new Response(filteredStream, {
headers: {
...backendResponse.headers,
'cache-control': 'private, no-store'
}
});
}
const parseSSE = (str) => {
return str
.split('\n')
.map(line => line.match(REGEX_SSE_LINE))
.filter(matchResult => matchResult !== null)
.reduce((out, [, k, v]) => ({
...out,
[k]: (k === 'data' && v) ? JSON.parse(v) : v
}), [])
;
};
const isValidEvent = (sseEvent) => {
if (!sseEvent.data) return true;
const city = sseEvent.data.destination;
const shouldEmit = CITIES.includes(city);
console.log('City: '+city+', include: '+ shouldEmit);
return shouldEmit;
};
addEventListener("fetch", event => event.respondWith(handler(event)));
const streamFilter = (inputStream, options) => {
let buffer = '';
const decoder = new TextDecoder();
const encoder = new TextEncoder();
const inputReader = inputStream.getReader();
const outputStream = new ReadableStream({
start() {
buffer = '';
},
pull(controller) {
return inputReader.read().then(({value: chunk, done: readerDone}) => {
const chunkStr = decoder.decode(chunk);
let events = (buffer + (chunkStr || '')).split(options.delimiter);
if (!readerDone) {
buffer = events.pop();
}
events
.filter(str => options.validator(options.parser(str)))
.forEach(str => controller.enqueue(encoder.encode(str+options.delimiter)))
;
controller.enqueue(encoder.encode(''));
if (readerDone) {
controller.close();
}
});
}
});
return outputStream;
}