Consuming Server-Sent Events (SSE) from a Python Client

This post was initially written when I was working on Integrand. It appeared in a series of posts. It performed decently; that’s why I’ve decided to plagiarize my own work. Cheers🍻.

I’ve always been fascinated by Server-Sent Events (SSE). It’s a standard/protocol describing how servers can initiate data transmission towards clients once an initial client connection has been established. The main use cases I’ve seen are as a transport in web browser-based clients, as browsers have the EventSource API built in. Unlike WebSockets, SSEs are a one-way communication channel from server to client, making them more efficient for certain use cases. I’ve been wondering about implementing SSE for server-side communication; this is how you do it using Python.

SSE Using Python

What are Server-Sent Events?

Server-Sent Events is a server push technology enabling a client to receive automatic updates from a server via an HTTP connection. Once the client establishes a connection to the server, it remains open, and the server can continue to send updates. This is particularly beneficial for applications that require real-time data but only need updates from the server.

Benefits of SSE

  1. Simplicity: SSE is simpler to implement compared to WebSockets, especially for scenarios requiring only server-to-client communication.
  2. Event-Driven: Allows sending specific types of events and handling them using event listeners.
  3. Text-Based Protocol: Easy to debug and implement without needing binary protocols.

Setting Up a Python Client to Consume SSE

To consume SSE from a Python client, you can use libraries like sseclient or httpx. In our example we will use our own client to quickly stand up an example.

Prerequisites

  • Python 3.x
  • requests library

Install the necessary library:

pip install requests

Code Example

  1. Create a Python Script(s):

    Create a Python files sse_client.py and main.py.

  2. Create the SSE Client Class:

    In the sse_client.py, we will be definint our SSE client class. This SSE client was taken and modified from the following repo: https://github.com/mpetazzoni/sseclient. It provides a nice wrapper around our response object from requests.

    import logging
    from typing import Any, Generator, Optional
    
    
    _FIELD_SEPARATOR = ':'
    
    
    class Event(object):
        """Representation of an event from the event stream."""
    
    
        def __init__(self, id:Optional[str]=None, event:str='message', data:str='', retry: Optional[Any]=None):
            self.id = id
            self.event = event
            self.data = data
            self.retry = retry
    
    
        def __str__(self) -> str:
            s = '{0} event'.format(self.event)
            if self.id:
                s += ' #{0}'.format(self.id)
            if self.data:
                s += ', {0} byte{1}'.format(len(self.data),
                                            's' if len(self.data) else '')
            else:
                s += ', no data'
            if self.retry:
                s += ', retry in {0}ms'.format(self.retry)
            return s
    
    
    class SSEClient(object):
        """Implementation of a SSE client.
    
    
        See http://www.w3.org/TR/2009/WD-eventsource-20091029/ for the
        specification.
        """
    
    
        def __init__(self, event_source: Generator[bytes, None, None], char_enc: str='utf-8'):
            """Initialize the SSE client over an existing, ready to consume
            event source.
    
    
            The event source is expected to be a binary stream and have a close()
            method. That would usually be something that implements
            io.BinaryIOBase, like an httplib or urllib3 HTTPResponse object.
            """
            self._logger = logging.getLogger(self.__class__.__module__)
            self._logger.debug('Initialized SSE client from event source %s',
                            event_source)
            self._event_source = event_source
            self._char_enc = char_enc
    
    
        def _read(self):
            """Read the incoming event source stream and yield event chunks.
    
    
            Unfortunately it is possible for some servers to decide to break an
            event into multiple HTTP chunks in the response. It is thus necessary
            to correctly stitch together consecutive response chunks and find the
            SSE delimiter (empty new line) to yield full, correct event chunks."""
            data = b''
            for chunk in self._event_source:
                for line in chunk.splitlines(True):
                    data += line
                    if data.endswith((b'\r\r', b'\n\n', b'\r\n\r\n')):
                        yield data
                        data = b''
            if data:
                yield data
    
    
        def events(self) -> Generator[Event, None, None]:
            for chunk in self._read():
                event = Event()
                # Split before decoding so splitlines() only uses \r and \n
                for line in chunk.splitlines():
                    # Decode the line.
                    line = line.decode(self._char_enc)
    
    
                    # Lines starting with a separator are comments and are to be
                    # ignored.
                    if not line.strip() or line.startswith(_FIELD_SEPARATOR):
                        continue
    
    
                    data = line.split(_FIELD_SEPARATOR, 1)
                    field = data[0]
    
    
                    # Ignore unknown fields.
                    if field not in event.__dict__:
                        self._logger.debug('Saw invalid field %s while parsing '
                                        'Server Side Event', field)
                        continue
    
    
                    if len(data) > 1:
                        # From the spec:
                        # "If value starts with a single U+0020 SPACE character,
                        # remove it from value."
                        if data[1].startswith(' '):
                            value = data[1][1:]
                        else:
                            value = data[1]
                    else:
                        # If no value is present after the separator,
                        # assume an empty value.
                        value = ''
    
    
                    # The data field may come over multiple lines and their values
                    # are concatenated with each other.
                    if field == 'data':
                        event.__dict__[field] += value + '\n'
                    else:
                        event.__dict__[field] = value
    
    
                # Events with no data are not dispatched.
                if not event.data:
                    continue
    
    
                # If the data field ends with a newline, remove it.
                if event.data.endswith('\n'):
                    event.data = event.data[0:-1]
    
    
                # Empty event names default to 'message'
                event.event = event.event or 'message'
    
    
                # Dispatch the event
                self._logger.debug('Dispatching %s...', event)
                yield event
    
    
        def close(self) -> None:
            """Manually close the event source stream."""
            self._event_source.close()
    
  3. Create Request to Consume SSE Connection

Using the file called main.py, we’ll create intiate a http request connection. Then we use our new SSE class to wrap the response and take advantage of streaming.

from sse_client import SSEClient

def if __name__ == "__main__":
    url = 'http://localhost:8000/events'
    headers = {
        'Accept': 'text/event-stream'
    }
    response = requests.get(url, stream=True, headers=headers)
    # response.raise_for_status() <- Add this if you want http errors to raise exceptions
    sseClient = SSEClient(response)

    for event in sseClient.events():
        print(event.data)

Handling Different Event Types

SSE supports different types of events. By default, all messages are considered message events. You can also send specific types of events by specifying an event field in the data sent by the server.

For example, if your server sends a custom event type:

event: customEvent
data: {"key": "value"}

You can handle it in your Python client:

for event in sseClient.events():
    if event.event == "customEvent":
        print(f"Custom Event received: {event.data}")
    else:
        print(f"Event received: {event.data}")

Wrapping up

Consuming Server-Sent Events from a Python client is straightforward with minimal libraries. SSE provides an efficient way to receive real-time updates from a server without the complexity of WebSockets or having to use something like Kafka. To get going with your python SSE client, set up a SSE server to emit your messages. Learn about how to set up the SSE server in go.

By following this guide, you can set up your Python client to handle SSE and implement custom event handling and reconnection logic to ensure robust and reliable real-time communication.

Have questions about what you read?
Get in touch now