Streaming Events
An example of how to use the REST API to stream sensor events in real-time.
Overview
When plotting life graphs or triggering certain alarms, a continuous stream of data is often preferred over periodic polling. In this example, we will see how the REST API can be used to set up a stream with some best practices and a simple retry policy.
Preliminaries
Data Connectors If you want to forward your data in a server-to-server integration, consider using Data Connectors for a simpler and more reliable service with an added at-least-once guarantee.
Basic Auth For simplicity, we here use Basic Auth for authentication. We recommend replacing this with an OAuth2 flow for integrations more complex than local experimentation.
Service Account Credentials You must create and know the credentials of a Service Account. Any role will suffice.
REST API This example utilizes our REST API to interact with our cloud. See the REST API Reference for a full list of available endpoints.
Response Format
Our REST API :stream
endpoints support two types of response formats, text/event-stream
and application/json
. They are set using the Accept
header, and defaults to application/json
.
headers = { "Accept": "application/json" } # default
headers = { "Accept": "text/event-stream" } # alternative
The different headers are used in different cases.
application/json
: Returns a JSON object for each event, separated by line-breaks. Easy to parse in any high-level language and used in the code sample below.text/event-stream
: A Server-Sent Events specific format. Used by any Server-Sent Events libraries, such asEventSource
.
Stream Best Practices
The following practices should be considered when implementing a stream. They are all used in the following example.
Implement a Retry Policy The connection to a stream can be lost at any moment due to several factors that break the connection and should be handled by implementing a retry policy. The stream will always disconnect after one hour, as that's how long the access token lasts. If you have not received a single event for the full hour, the stream will disconnect with a 408 HTTP status code.
Detect Stream Disconnects Early An optional
ping_interval
query parameter may be used to make sure the client can still receive messages from the server. If no ping messages have been received in the specified interval, the client should reconnect to the stream.Filter Events By default, all event types are included in the stream. Use query parameters to reduce the number of events that need to be processed.
Authorization Header Use the
Authorization
header when possible. Some libraries—like the built-inEventSource
class in browsers—does not allow setting headers. In this case, thetoken
query parameter can be used instead.
Example Code
The following points summarize the provided example code.
Sends a GET request to the REST API to initialize an event stream.
Keep the TCP connection open while receiving events.
If the connection is lost or it's been too long between pings, retry N times before giving up.
Environment Setup
If you wish to run the code locally, make sure you have a working runtime environment.
The following packages are required by the example code.
pip install requests==2.31.0
Add environment variables for authentication details.
export DT_SERVICE_ACCOUNT_KEY_ID=<YOUR_SERVICE_ACCOUNT_KEY_ID>
export DT_SERVICE_ACCOUNT_SECRET=<YOUR_SERVICE_ACCOUNT_SECRET>
export DT_SERVICE_ACCOUNT_EMAIL=<YOUR_SERVICE_ACCOUNT_EMAIL>
export DT_PROJECT_ID=<YOUR_PROJECT_ID>
Source
The following code snippet implements streaming in a few languages.
import os
import time
import json
import requests
# Service Account credentials.
SERVICE_ACCOUNT_KEY_ID = os.environ.get('DT_SERVICE_ACCOUNT_KEY_ID')
SERVICE_ACCOUNT_SECRET = os.environ.get('DT_SERVICE_ACCOUNT_SECRET')
# Construct API URL.
PROJECT_ID = os.environ.get('DT_PROJECT_ID')
BASE_URL = 'https://api.d21s.com/v2'
DEVICES_STREAM_URL = '{}/projects/{}/devices:stream'.format(
BASE_URL,
PROJECT_ID
)
# A few constants to control stream behavior.
MAX_CONNECTION_RETRIES = 5
PING_INTERVAL = 10
PING_JITTER = 2
if __name__ == '__main__':
# Set up a simple catch-all retry policy.
nth_retry = 0
while nth_retry <= MAX_CONNECTION_RETRIES:
try:
print('Streaming... Press CTRL+C to terminate.')
# Set up a stream connection.
# Connection will timeout and reconnect if no single event
# is received in an interval of PING_INTERVAL + PING_JITTER.
stream = requests.get(
url=DEVICES_STREAM_URL,
auth=(SERVICE_ACCOUNT_KEY_ID, SERVICE_ACCOUNT_SECRET),
stream=True,
timeout=PING_INTERVAL + PING_JITTER,
params={
'event_types': [],
'ping_interval': '{}s'.format(PING_INTERVAL),
},
)
# Iterate through the events as they come in (one per line).
for line in stream.iter_lines():
# Decode the response payload and break on error.
payload = json.loads(line.decode('ascii'))
if 'result' not in payload:
raise Exception(payload)
event = payload['result']['event']
# Skip ping events.
if event['eventType'] == 'ping':
continue
# Print events as they arrive.
print(f'Got {event["eventType"]} event.')
# Reset retry counter.
nth_retry = 0
except KeyboardInterrupt:
break
except requests.exceptions.ConnectionError:
print('Connection lost. Reconnecting...')
nth_retry += 1
except Exception as e:
print(e)
# Print the error and try again up to MAX_CONNECTION_RETRIES.
if nth_retry < MAX_CONNECTION_RETRIES:
print('Something happened. Retry #{}'.format(nth_retry+1))
# Exponential backoff in sleep time.
time.sleep(2**nth_retry)
nth_retry += 1
else:
break
Expected Output
For each new event in the stream, a line will be printed to stdout.
Streaming... Press CTRL+C to terminate.
Got touch event.
Got networkStatus event.
...
Last updated
Was this helpful?