const CITIES = ['Atlanta', 'Berlin', 'Dublin', 'Boston', 'Denver', 'Tokyo', 'Singapore'];
const REGEX_SSE_LINE = /^([^:]+?)\s*:\s*(.*?)\s*$/;
async function handler(event) {
  const clientReq = event.request;
  const backendResponse = await fetch(clientReq, { backend: "origin_0" });
  const filteredStream = streamFilter(backendResponse.body, {
    delimiter: '\n\n',
    parser: parseSSE,
    validator: isValidEvent
  });
  return new Response(filteredStream, {
    headers: {
      ...backendResponse.headers,
      'cache-control': 'private, no-store'
    }
  });
}
const parseSSE = (str) => {
  return str
    .split('\n')
    .map(line => line.match(REGEX_SSE_LINE))
    .filter(matchResult => matchResult !== null)
    .reduce((out, [, k, v]) => ({
      ...out,
      [k]: (k === 'data' && v) ? JSON.parse(v) : v
    }), [])
  ;
};
const isValidEvent = (sseEvent) => {
  if (!sseEvent.data) return true;
  const city = sseEvent.data.destination;
  const shouldEmit = CITIES.includes(city);
  console.log('City: '+city+', include: '+ shouldEmit);
  return shouldEmit;
};
addEventListener("fetch", event => event.respondWith(handler(event)));
const streamFilter = (inputStream, options) => {
  let buffer = ''; 
  const decoder = new TextDecoder();
  const encoder = new TextEncoder();
  const inputReader = inputStream.getReader();
  const outputStream = new ReadableStream({
    start() {
      buffer = '';
    },
    
    pull(controller) {
      
      return inputReader.read().then(({value: chunk, done: readerDone}) => {
        const chunkStr = decoder.decode(chunk);
        let events = (buffer + (chunkStr || '')).split(options.delimiter);
        
        if (!readerDone) {
          buffer = events.pop();
        }
        
        
        events
          .filter(str => options.validator(options.parser(str)))
          .forEach(str => controller.enqueue(encoder.encode(str+options.delimiter)))
        ;
        
        controller.enqueue(encoder.encode(''));
        if (readerDone) {
          controller.close();
        }
      });
    }
  });
  return outputStream;
}