Consuming Server-Sent Events (SSE) from a Python Client
This post was initially written when I was working on Integrand. It appeared in a series of posts. It performed decently; that’s why I’ve decided to plagiarize my own work. Cheers🍻.
I’ve always been fascinated by Server-Sent Events (SSE). It’s a standard/protocol describing how servers can initiate data transmission towards clients once an initial client connection has been established. The main use cases I’ve seen are as a transport in web browser-based clients, as browsers have the EventSource API built in. Unlike WebSockets, SSEs are a one-way communication channel from server to client, making them more efficient for certain use cases. I’ve been wondering about implementing SSE for server-side communication; this is how you do it using Python.
What are Server-Sent Events?
Server-Sent Events is a server push technology enabling a client to receive automatic updates from a server via an HTTP connection. Once the client establishes a connection to the server, it remains open, and the server can continue to send updates. This is particularly beneficial for applications that require real-time data but only need updates from the server.
Benefits of SSE
- Simplicity: SSE is simpler to implement compared to WebSockets, especially for scenarios requiring only server-to-client communication.
- Event-Driven: Allows sending specific types of events and handling them using event listeners.
- Text-Based Protocol: Easy to debug and implement without needing binary protocols.
Setting Up a Python Client to Consume SSE
To consume SSE from a Python client, you can use libraries like sseclient
or httpx
. In our example we will use our own client(and requests
library) to quickly stand up an example.
Prerequisites
- Python 3.x
requests
library
Install the necessary library:
pip install requests
Code Example
Create a Python Script(s):
Create a Python files
sse_client.py
andmain.py
.Create the SSE Client Class:
In the
sse_client.py
, we will be definint our SSE client class. This SSE client was taken and modified from the following repo: https://github.com/mpetazzoni/sseclient. It provides a nice wrapper around our response object fromrequests
.import logging from typing import Any, Generator, Optional _FIELD_SEPARATOR = ':' class Event(object): """Representation of an event from the event stream.""" def __init__(self, id:Optional[str]=None, event:str='message', data:str='', retry: Optional[Any]=None): self.id = id self.event = event self.data = data self.retry = retry def __str__(self) -> str: s = '{0} event'.format(self.event) if self.id: s += ' #{0}'.format(self.id) if self.data: s += ', {0} byte{1}'.format(len(self.data), 's' if len(self.data) else '') else: s += ', no data' if self.retry: s += ', retry in {0}ms'.format(self.retry) return s class SSEClient(object): """Implementation of a SSE client. See http://www.w3.org/TR/2009/WD-eventsource-20091029/ for the specification. """ def __init__(self, event_source: Generator[bytes, None, None], char_enc: str='utf-8'): """Initialize the SSE client over an existing, ready to consume event source. The event source is expected to be a binary stream and have a close() method. That would usually be something that implements io.BinaryIOBase, like an httplib or urllib3 HTTPResponse object. """ self._logger = logging.getLogger(self.__class__.__module__) self._logger.debug('Initialized SSE client from event source %s', event_source) self._event_source = event_source self._char_enc = char_enc def _read(self): """Read the incoming event source stream and yield event chunks. Unfortunately it is possible for some servers to decide to break an event into multiple HTTP chunks in the response. It is thus necessary to correctly stitch together consecutive response chunks and find the SSE delimiter (empty new line) to yield full, correct event chunks.""" data = b'' for chunk in self._event_source: for line in chunk.splitlines(True): data += line if data.endswith((b'\r\r', b'\n\n', b'\r\n\r\n')): yield data data = b'' if data: yield data def events(self) -> Generator[Event, None, None]: for chunk in self._read(): event = Event() # Split before decoding so splitlines() only uses \r and \n for line in chunk.splitlines(): # Decode the line. line = line.decode(self._char_enc) # Lines starting with a separator are comments and are to be # ignored. if not line.strip() or line.startswith(_FIELD_SEPARATOR): continue data = line.split(_FIELD_SEPARATOR, 1) field = data[0] # Ignore unknown fields. if field not in event.__dict__: self._logger.debug('Saw invalid field %s while parsing ' 'Server Side Event', field) continue if len(data) > 1: # From the spec: # "If value starts with a single U+0020 SPACE character, # remove it from value." if data[1].startswith(' '): value = data[1][1:] else: value = data[1] else: # If no value is present after the separator, # assume an empty value. value = '' # The data field may come over multiple lines and their values # are concatenated with each other. if field == 'data': event.__dict__[field] += value + '\n' else: event.__dict__[field] = value # Events with no data are not dispatched. if not event.data: continue # If the data field ends with a newline, remove it. if event.data.endswith('\n'): event.data = event.data[0:-1] # Empty event names default to 'message' event.event = event.event or 'message' # Dispatch the event self._logger.debug('Dispatching %s...', event) yield event def close(self) -> None: """Manually close the event source stream.""" self._event_source.close()
Create Request to Consume SSE Connection
Using the file called main.py
, we’ll create intiate a http request connection. Then we use our new SSE class to wrap the response and take advantage of streaming.
from sse_client import SSEClient
def if __name__ == "__main__":
url = 'http://localhost:8000/events'
headers = {
'Accept': 'text/event-stream'
}
response = requests.get(url, stream=True, headers=headers)
# response.raise_for_status() <- Add this if you want http errors to raise exceptions
sseClient = SSEClient(response)
for event in sseClient.events():
print(event.data)
Handling Different Event Types
SSE supports different types of events. By default, all messages are considered message
events. You can also send specific types of events by specifying an event
field in the data sent by the server.
For example, if your server sends a custom event type:
event: customEvent
data: {"key": "value"}
You can handle it in your Python client:
for event in sseClient.events():
if event.event == "customEvent":
print(f"Custom Event received: {event.data}")
else:
print(f"Event received: {event.data}")
Wrapping up
Consuming Server-Sent Events from a Python client is straightforward with minimal libraries. SSE provides an efficient way to receive real-time updates from a server without the complexity of WebSockets or having to use something like Kafka. To get going with your python SSE client, set up a SSE server to emit your messages. Learn about how to set up the SSE server in go.
By following this guide, you can set up your Python client to handle SSE and implement custom event handling and reconnection logic to ensure robust and reliable real-time communication.