# Receiving Events

## Request Contents

Events are delivered to the receiving endpoint as HTTPS POST requests.

Both the headers and body of the incoming request contain information of interest that can be extracted and, depending on the [configuration](https://docs.developer.disruptive-technologies.com/data-connectors/advanced-configurations), used to verify the content and origin of the request.

{% hint style="info" %}
**Exploring the request contents**

You can use [webhook.site](https://webhook.site) to explore the contents of the POST requests. This site will generate a URL that you can point a Data Connector to, and it will display all the relevant details about each request.

Note that using this service will make the events publicly available through the URL generated by that site. We recommend exploring this with emulators in a separate project in DT Studio.
{% endhint %}

### Header

If a [signature secret](https://docs.developer.disruptive-technologies.com/advanced-configurations#signing-events) is set in the Data Connector configuration, the following header will be included.

* **X-Dt-Signature**\
  Includes a [JSON Web Token](https://jwt.io/) (JWT). Once decoded using the signature secret, it contains all the necessary information to verify the request content and origin.

Note that depending on the framework used to receive the Data Connector events, the header name casing may differ. Some services will force header names to be lower-cased, like `x-dt-signature`.

Read more about validating the request signature in the [Verifying Signed Events](#verifying-signed-events) section below.

### Body

The request body contains three fields, `event`, `labels`, and `metadata`. The following snippet shows an example request body of a `touch` event for a `humidity` sensor forwarded by a Data Connector.

```javascript
{
    "event": {
        "eventId": "<EVENT_ID>",
        "targetName": "projects/<PROJECT_ID>/devices/<DEVICE_ID>",
        "eventType": "touch",
        "data": {
            "touch": {
                "updateTime": "2021-05-28T08:34:06.225872Z"
            }
        },
        "timestamp": "2021-05-28T08:34:06.225872Z"
    },
    "labels": {
        "room-number": "99"
    },
    "metadata": {
        "deviceId": "<DEVICE_ID>",
        "projectId": "<PROJECT_ID>",
        "deviceType": "humidity",
        "productNumber": "102081"
    }
}
```

<table data-header-hidden><thead><tr><th width="194.4794101688501">Field</th><th width="150">Type</th><th>Description</th></tr></thead><tbody><tr><td>Field</td><td>Type</td><td>Description</td></tr><tr><td><code>event</code></td><td><code>struct</code></td><td>Contains event data. See the <a href="../../concepts/events#structure">Event</a> documentation where the structure is explained in detail for each event type.</td></tr><tr><td><code>labels</code></td><td><code>struct</code></td><td>Device label key- and value pairs included by the Data Connector. See the <a href="../advanced-configurations#including-labels">Advanced Configuration</a> page for details about including labels.</td></tr><tr><td><code>metadata</code></td><td><code>struct</code></td><td>Contains metadata about the device that is the source of the event. See the section below for more details.</td></tr></tbody></table>

#### Event Metadata

Each event has a `metadata` field that includes details about the device that is the source of the event. The event metadata has the following structure.

| Field           | Type     | Description                                                                                                                                 |
| --------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| `deviceId`      | `string` | The identifier of the device that published the event.                                                                                      |
| `projectId`     | `string` | The identifier of the project the device is in.                                                                                             |
| `deviceType`    | `string` | The [device type](https://docs.developer.disruptive-technologies.com/concepts/devices#device-types) that published the event.               |
| `productNumber` | `string` | The [product number](https://docs.developer.disruptive-technologies.com/concepts/devices#structure) of the device that published the event. |

{% hint style="info" %}
**Note**

The structure of the `metadata` field might change in the future if event types are added that are not published by devices. Make sure to first check `event.eventType` to make sure it is a known device event before processing the `metadata` field.

See the code sample below for an example of how to do this.
{% endhint %}

The event metadata makes it possible to check which [device type](https://docs.developer.disruptive-technologies.com/concepts/devices#device-types) has published the event, even for event types like `touch` or `networkStatus` which are published by many types of devices. This makes it possible to add new devices to a database without having to first look up the device using the REST API.

The metadata also provides a more convenient way to get the `deviceId` and `projectId` of the device that published the event, without having to parse the `event.targetName` field.

## Implementing Your Endpoint

In order to receive events from a Data Connector, your server needs to be set up to do the following:

* Start listening for incoming requests on the URL specified in the Data Connector's Endpoint URL.
* Read the HTTP headers and body, and process the event.
* Reply with a **2XX** status code in a timely manner. Any response code outside the 2xx range will be considered a failed delivery and will be [retried](#retry-policy).

If your endpoint fails to reply with status codes in the 2xx range consistently for an extended time period, the Data Connector will eventually be [automatically disabled](https://docs.developer.disruptive-technologies.com/advanced-configurations#auto-disabled-data-connectors).

This server can be written in any programming language. Example implementations in a selection of languages can be found on the [Example Integrations](https://docs.developer.disruptive-technologies.com/data-connectors/example-integrations) page, as well as in the [Verifying Signed Events](#verifying-signed-events) section on this page.

Regardless of which language is used to implement the server, there are a few things to keep in mind:

#### Server Configuration

The server needs to be set up to listen for HTTPS POST requests on the URL specified in the Data Connector's Endpoint URL. This endpoint needs to be publicly available and not require any authorization. You can verify that the event originates from DT by following the steps in the Verifying Signed Events section.

The server also needs to be configured with a valid SSL certificate that is issued from one of the root certificates from [Mozilla's CA Certificate Program](https://wiki.mozilla.org/CA). This will be the case for most certificates (e.g. certificates issued by Let's Encrypt, DigiCert, GlobalSign, etc). Self-signed certificates are not supported.

{% hint style="info" %}
**SSL Certificate Verification**

You can verify that you have a valid SSL cert by running `curl -v {YOUR_ENDPOINT}` in your terminal and look for the string "SSL certificate verify ok".

To verify that your SSL cert is issued by one of the root certs in Mozilla's CA Certificate Program, you can spin up an Alpine docker container locally, and check your SSL cert from that container. Alpine uses the root certs in Mozilla's CA Certificate Program to validate SSL certificates. Run the following commands in your terminal (assuming Docker is installed):

1. `docker run --rm -ti alpine:latest sh`
2. `apk --update add ca-certificates curl`
3. `curl -v {YOUR_ENDPOINT}`
4. Make sure the "SSL certificate verify ok" string is present in the output
5. Use ctrl+d to quit the container

To help diagnose any SSL issues, you can use the [SSL Server Test](https://www.ssllabs.com/ssltest/analyze.html) from SSL Labs.
{% endhint %}

#### Handle Incoming Event

Each event will be delivered to your endpoint separately as individual requests. When processing a request, you should do the following steps:

1. (Recommended) Verify the integrity of the event, and that it originates from DT (see [Verify Signed Events](#verifying-signed-events) below).
2. Do something with the event. You might for example put in on a separate queue for later processing (Google Pubsub, Amazon SQS, Azure ServiceBus, etc), write it to a database, or do some other processing.
3. Respond with a status code in the 2xx range once you've accepted/processed the event.

These steps should all be done in a timely manner, and respond within 10 seconds. If your endpoint takes more than 10 seconds to respond, you run the risk that it will time out and be retried according to the [Retry Policy](#retry-policy).

If you think there's a possibility that your processing might take more than 10 seconds, prefer to write the event to a separate queue or a database and do the processing at a later point in time.

## Handling Duplicates

Every event received by DT Cloud is put in a [dedicated, per-Data Connector queue](https://docs.developer.disruptive-technologies.com/introduction-to-data-connector#at-least-once-guarantee). Messages are removed from this queue once acknowledged, or if the message is older than 12 hours.

A side effect of this delivery guarantee is that, under certain conditions, **you may receive duplicates** of the same event. While rare, deduplication should be performed on the receiving end by checking event IDs.

{% hint style="success" %}
**Best Practice**

Use the included **eventId** field to check for duplicated events.
{% endhint %}

## Retry policy

Any time a Data Connector does not receive a successful response (**HTTP status code 2xx**), the event will be retried. If an event has not been successfully acknowledged after 12 hours, it will be discarded.&#x20;

The retry interval is calculated as an exponential backoff policy, given by

$$
t\_0\cdot2^{n-1},
$$

where $$t0$$ is the initial interval of 8 seconds and $$n$$ the attempt number. The interval will not exceed 1 hour. For very slow endpoints, the minimum retry interval will be $$4x$$ the response time.

The following table shows the retry interval after a given number of delivery attempts:

<table data-header-hidden><thead><tr><th width="332">Attempt</th><th>Retry Interval [s]</th></tr></thead><tbody><tr><td>Attempt</td><td>Retry Interval [s]</td></tr><tr><td>1</td><td>8</td></tr><tr><td>2</td><td>16</td></tr><tr><td>3</td><td>32</td></tr><tr><td>...</td><td>...</td></tr><tr><td>9</td><td>2048</td></tr><tr><td>10</td><td>3600</td></tr><tr><td>11</td><td>3600</td></tr></tbody></table>

## Verifying Signed Events

When using a Signature Secret, the [X-Dt-Signature](#header) header is included and contains a [JWT](https://jwt.io/), signed by the Signature Secret. Inside, a checksum of the request body can be found and used to check for tampering.&#x20;

The following steps sum up the process of verifying the received request at the receiving endpoint.

1. Extract the signed JWT from the HTTP header **X-Dt-Signature** of the received request.
2. Verify the JWT's signature with the **signature secret.**
3. Calculate a **SHA256** checksum over the entire request body.
4. Compare the body checksum with the `checksum_sha256` field contained in the JWT (there's a `checksum` field as well that uses SHA1 which is less secure than SHA256, and is kept only for backward compatibility).
5. If these checksums are identical, you can be certain that the event has not been tampered with and originated from your Data Connector.

The following snippet from our [Google Cloud Function example integration](https://docs.developer.disruptive-technologies.com/data-connectors/example-integrations/google-cloud-functions) implements this verification process.

{% tabs %}
{% tab title="Python 3.11" %}

```python
# This Python script is built on Flask, docs are available here:
# https://flask.palletsprojects.com/en/2.0.x/quickstart/

import os
import hashlib
import jwt                        # pip install pyjwt==2.7.0
from flask import Flask, request  # pip install Flask==2.3.2

app = Flask(__name__)

# Read environment variable.
SIGNATURE_SECRET = os.environ.get('DT_SIGNATURE_SECRET')


@app.route('/', methods=["POST"])
def data_connector_endpoint():
    # Extract the body as a bytestring and the signed JWT.
    # We'll use these values to verify the request.
    payload = request.get_data()
    token = request.headers['x-dt-signature']

    # Verify request origin and content integrity.
    if not verify_request(payload, token):
        return ('Could not verify request.', 400)

    # We now know the request came from DT Cloud, and the integrity
    # of the body has been verify. We can now handle the event safely.
    handle_event(request.get_json())

    # Respond with a 200 status code to ack the event. Any states codes
    # that are outside the 2xx range will nack the event, meaning it will
    # be retried later.
    return ('OK', 200)


def verify_request(body, token):
    """
    Verifies that the request originated from DT, and that the body
    hasn't been modified since it was sent. This is done by verifying
    that the checksum field of the JWT token matches the checksum of the
    request body, and that the JWT is signed with the signature secret.
    """

    # Decode the JWT, and verify that it was signed using the
    # signature secret. Also verifies that the algorithm used was HS256.
    try:
        payload = jwt.decode(token, SIGNATURE_SECRET, algorithms=["HS256"])
    except Exception as err:
        print(err)
        return False

    # Verify the request body checksum.
    m = hashlib.sha256()
    m.update(body)
    checksum = m.digest().hex()
    if payload["checksum_sha256"] != checksum:
        print('Checksum Mismatch')
        return False

    return True


def handle_event(body):
    """
    Processes the event itself. For this example, we will just
    decode a touch event, and print out the timestamp, device ID,
    and the device type.
    """
    # First, check if the event type is one of the event
    # types we're expecting.
    # As an example, we'll check for touch events here.
    if body['event']['eventType'] == 'touch':
        # Now that we know this is a device event, we can
        # check for the device type and device identifier
        # in the event metadata.
        device_type = body['metadata']['deviceType']
        device_id = body['metadata']['deviceId']
        timestamp = body['event']['data']['touch']['updateTime']

        print("Got touch event at {} from {} sensor with id {}".format(
            timestamp,
            device_type,
            device_id,
        ))
```

{% endtab %}

{% tab title="Python API" %}

```python
# This Python is built on Flask, docs are available here:
# https://flask.palletsprojects.com/en/2.0.x/quickstart/

import os

# pip install dtintegrations==0.5.1
from dtintegrations import data_connector, provider
from flask import Flask, request  # pip install Flask==2.3.2

app = Flask(__name__)

# Read environment variable.
DT_SIGNATURE_SECRET = os.getenv('DT_SIGNATURE_SECRET')


@app.route('/', methods=['POST'])
def dataconnector_endpoint():
    # Validate and decode the incoming request.
    payload = data_connector.HttpPush.from_provider(
        request,
        provider=provider.FLASK,
        secret=DT_SIGNATURE_SECRET,
    )

    # Print the event data.
    handle_event(payload)

    # If all is well, return 200 response.
    return ('OK', 200)


def handle_event(payload):
    """
    Processes the event itself. For this example, we will just
    decode a touch event, and print out the timestamp, device ID,
    and the device type.
    """
    # First, check if the event type is one of the event
    # types we're expecting.
    # As an example, we'll check for touch events here.
    if payload.event.event_type == 'touch':
        # Now that we know this is a device event, we can
        # check for the device type and device identifier
        # in the event metadata.
        metadata = payload.get_device_metadata()
        if metadata is None:
            device_type = metadata.device_type
            device_id = metadata.device_id
            timestamp = payload.event.data.timestamp

            print("Got touch event at {} from {} sensor with id {}".format(
                timestamp,
                device_type,
                device_id,
            ))
```

{% endtab %}

{% tab title="Node.js 16" %}

```javascript
const crypto = require('crypto')
const express = require('express')  // npm install express@4.17.2
const jwt = require('jsonwebtoken') // npm install jsonwebtoken@8.5.1

// Read environment variable
const signatureSecret = process.env.DT_SIGNATURE_SECRET

// dataConnectorEndpoint receives, validates, and returns a response 
// for the forwarded event.
const dataConnectorEndpoint = (req, res) => {
    
    // Validate request origin and content integrity.
    let token = req.headers['x-dt-signature']
    if (verifyRequest(JSON.stringify(req.body), token) === false) {
        res.sendStatus(400)
        return
    }
    
    // We now know the request came from DT Cloud, and the integrity
    // of the body has been verify. We can now handle the event safely.
    handleEvent(req.body)
    
    // Respond with a 200 status code to ack the event. Any states codes
    // that are outside the 2xx range will nack the event, meaning it will
    // be retried later.
    res.sendStatus(200);
};

// Verifies that the request originated from DT, and that the body
// hasn't been modified since it was sent. This is done by verifying
// that the checksum field of the JWT token matches the checksum of the
// request body, and that the JWT is signed with the signature secret.
const verifyRequest = (payload, token) => {
    // Decode the JWT, and verify that it was signed using the 
    // signature secret. Also verifies that the algorithm used was HS256.
    let decoded
    try {
        decoded = jwt.verify(token, signatureSecret, { algorithms: "HS256" })
    } catch(err) {
        console.log(err)
        return false
    }

    // Verify the request body checksum.
    const hash = crypto.createHash("sha256")
    const checksum = hash.update(payload).digest("hex")
    if (checksum !== decoded.checksum_sha256) {
        console.log('Checksum Mismatch')
        return false
    }

    return true
}

// handleEvent processes the event itself. For this example,
// we will just decode a touch event, and print out the timestamp,
// device ID, and the device type.
const handleEvent = (payload) => {
    // First, check if the event type is one of the event
    // types we're expecting. 
    // As an example, we'll check for touch events here.
    switch (payload.event.eventType) {
        case 'touch':
            // Now that we know this is a device event, we can 
            // check for the device type and device identifier 
            // in the event metadata.
            const deviceType = payload.metadata.deviceType
            const deviceId = payload.metadata.deviceId
            const timestamp = payload.event.data.touch.updateTime
    
            console.log(`Received touch event at ${timestamp} from ${deviceType} sensor with id ${deviceId}`)
            break
        default:
            break
    }
}

// Sets up a bare-bones server that listens on port 8080, and routes
// all requests to the path "/" to the `dataConnectorEndpoint` function
const app = express()
app.use(express.json())
app.listen(8080)
app.post('/', dataConnectorEndpoint)

```

{% endtab %}

{% tab title="Go 1.20" %}

```go
package main

import (
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"
    "os"

    // go get github.com/golang-jwt/jwt/v5@v5.0.0
    jwt "github.com/golang-jwt/jwt/v5"
)

// Read environment variable
var signatureSecret = os.Getenv("DT_SIGNATURE_SECRET")

// DataConnectorEndpoint receives, validates, and returns a response
// for the forwarded event.
func DataConnectorEndpoint(w http.ResponseWriter, r *http.Request) {

    // Extract the body and the signed JWT.
    // We'll use these values to verify the request.
    tokenString := r.Header.Get("x-dt-signature")
    bodyBytes, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // Validate request origin and content integrity.
    if err := verifyRequest(bodyBytes, tokenString); err != nil {
        fmt.Println(err)
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // We now know the request came from DT Cloud, and the integrity
    // of the body has been verify. We can now handle the event safely.
    if err := handleEvent(bodyBytes); err != nil {
        fmt.Println(err)
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // Respond with a 200 status code to ack the event. Any states codes
    // that are outside the 2xx range will nack the event, meaning it
    // will be retried later.
    w.WriteHeader(http.StatusOK)
    _, _ = w.Write([]byte("OK"))
}

// verifyRequest verifies that the request originated from DT, and that
// the body hasn't been modified since it was sent. This is done by
// verifying that the checksum field of the JWT token matches the checksum
// of the request body, and that the JWT is signed with the signature secret.
func verifyRequest(bodyBytes []byte, tokenString string) error {
    // Decode the JWT, and verify that it was signed using the
    // signature secret. Also verifies the algorithm used to sign the JWT.
    token, err := jwt.Parse(
        tokenString,
        func(token *jwt.Token) (interface{}, error) {
            // Return out signature secret to verify that it was used to
            // sign the JWT.
            return []byte(signatureSecret), nil
        },
        jwt.WithValidMethods([]string{"HS256"}),
    )
    if err != nil {
        return err
    }

    // Verify the request body checksum.
    sha256Bytes := sha256.Sum256(bodyBytes)
    sha256String := hex.EncodeToString(sha256Bytes[:])
    claims := token.Claims.(jwt.MapClaims)
    if sha256String != claims["checksum_sha256"] {
        return fmt.Errorf("Checksum mismatch.")
    }

    return nil
}

// handleEvent processes the event itself. For this example,
// we will just decode a touch event, and print out the timestamp,
// device ID, and the device type.
func handleEvent(payload []byte) error {
    // The structure of the events we'll received from a Data Connector.
    type Event struct {
        Event struct {
            EventId   string          `json:"eventId"`
            EventType string          `json:"eventType"`
            Data      json.RawMessage `json:"data"`
            Timestamp string          `json:"timestamp"`
        } `json:"event"`
        Labels   map[string]string `json:"labels"`
        Metadata map[string]string `json:"metadata"`
    }

    // The structure of the `Event.Data` field for a touch event.
    // We'll be using touch events for this example.
    type TouchData struct {
        Touch struct {
            Timestamp string `json:"updateTime"`
        } `json:"touch"`
    }

    // Decode the event
    var event Event
    if err := json.Unmarshal(payload, &event); err != nil {
        return err
    }

    // First, check if the event type is one of the event
    // types we're expecting.
    // As an example, we'll check for touch events here.
    switch event.Event.EventType {
    case "touch":
        // Now that we know this is a touch event, we can decode
        // the `Event.Data` field.
        var touchData TouchData
        err := json.Unmarshal(event.Event.Data, &touchData)
        if err != nil {
            return err
        }

        // Also, since we now know this is a device event, we can
        // check for the device type and device identifier
        // in the event metadata.
        deviceType := event.Metadata["deviceType"]
        deviceId := event.Metadata["deviceId"]
        timestamp := touchData.Touch.Timestamp

        fmt.Printf("Received touch event at %s from %s sensor with id %s\n",
            timestamp,
            deviceType,
            deviceId,
        )
    }

    return nil
}

func main() {
    // Sets up a bare-bones server that listens on port 8080, and
    // routes all requests to the path "/" to the
    // `DataConnectorEndpoint` function.

    http.HandleFunc("/", DataConnectorEndpoint)

    fmt.Println("Started listening on localhost:8080 ...")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        fmt.Printf("Closed with error: %v\n", err)
    } else {
        fmt.Println("Closed server successfully")
    }
}
```

{% endtab %}
{% endtabs %}
