Eclipse Ditto does not send all things events over target connection

209 Views Asked by At

We are integrating Eclipse Ditto into a digital twin platform, but we have encountered a problem while testing and we don't really know how to fix it.

As a context, the goal is to receive in 593 twins (Ditto Thing) the result of a simulation. The idea is to be able to do several simulation runs simultaneously and that each simulation run sends 593 messages to a Kafka topic. For example, for 6 runs we will have 3558 messages in the topic.

In Eclipse Ditto we have created 593 things, one for each element returned by the simulation. We have established in Eclipse Ditto a source connection with Kafka, where it performs a Javascript mapping of each message. In this mapping, a merge is performed where the features of the twin corresponding to the message are updated, together with an attribute that indicates to which simulation identifier this new data corresponds.

On the other hand, we have a target connection with MQTT. In the connection we send all the events of any Ditto thing in order to store them in a database. In this connection no mapping is done, it is sent directly in Ditto Protocol.

The problem that we have is the following. Initially we were using Eclipse Ditto version 2.3 but, although all messages are received in MQTT, it was incredibly slow. Reading 6 runs took about 20 minutes approximately. We know that this is not a problem of the simulation because all messages are ready in Kafka's topic in just a few seconds. We decided to upgrade to the latest version (which is 3.2 currently) which seems to be much faster. According to the metrics and logs, this version consumes and maps all the messages in the Kafka topic, so it seems that things are updated. This makes it much faster. The problem is that not all update events are sent over the target connection. For example, not even 1000 out of 3558 are sent over MQTT. According to the logs the messages are being dropped as a result of a backpressure strategy. Considering this page, we have changed some environment variables but we still get the same problem. I put them below, in Helm's values.yaml that we use to install Eclipse Ditto. When a single run is sent it works correctly, it starts to fail when several runs are sent simultaneously. We have also tried making the target connection with Kafka, but the same thing happens. Likewise, we have installed it on two different computers with enough resources to run it easily, but the result has been the same on both.

Here are the connections we have established, their logs and metrics, along with the Helm's values.yaml.

Source connection:

{
  "name": "connection-for-simulation",
  "connectionType": "kafka",
  "connectionStatus": "open",
  "failoverEnabled": true,
  "uri": "tcp://KAFKAIP",
  "specificConfig": {
    "bootstrapServers": "KAFKAIP",
    "saslMechanism": "plain"
  },
  "sources": [
    {
      "addresses": [
        "riego"
      ],
      "consumerCount": 1,
      "qos": 1,
      "authorizationContext": [
        "nginx:ditto"
      ],
      "headerMapping": {
        "id": "{{ entity:id }}",
        "namespace": "{{ entity:namespace }}",
        "connection": "{{ connection:id }}"
      },
      "payloadMapping": [
        "javascript"
      ]
    }
  ],
  "mappingDefinitions": {
    "javascript": {
      "mappingEngine": "JavaScript",
      "options": {
        "incomingScript": "function mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType) { const jsonData = JSON.parse(textPayload); const degree = jsonData[0]; const data = jsonData[1]; var id = 'irrigation_simulation:' + data.Id; headers = Object.assign(headers, { 'Content-Type': 'application/merge-patch+json' }); var features = { idSimulationRun: { properties: { value: 'degree_' + degree } } }; Object.keys(data).forEach((key) => { if (key !== 'Id') { obj = {}; obj[key] = { properties: { value: data[key] } }; Object.assign(features, obj); } }); const thing = { attributes: { _parents: 'pivot:irrigation_simulation' }, features: features }; return Ditto.buildDittoProtocolMsg( 'pivot', id, 'things', 'twin', 'commands', 'merge', '/', headers, thing ); }"
      }
    }
  }
}

Target connection:

{
  "name": "mqtt-connection",
  "connectionType": "mqtt-5",
  "connectionStatus": "open",
  "failoverEnabled": true,
  "uri": "tcp://MQTTIP",
  "sources": [],
  "targets": [
    {
      "address": "opentwins/{{ topic:channel }}/{{ topic:criterion }}/{{ thing:namespace }}/{{ thing:name }}",
      "qos": 1,
      "topics": [
        "_/_/things/twin/events?extraFields=thingId,attributes/_parents,features/idSimulationRun/properties/value",
        "_/_/things/live/messages",
        "_/_/things/live/commands"
      ],
      "authorizationContext": [
        "nginx:ditto"
      ]
    }
  ]
}

Logs source connection: https://pastebin.com/zMKetKtL

Metrics source connection:

{
  "type": "connectivity.responses:retrieveConnectionMetrics",
  "status": 200,
  "connectionId": "59ef4006-9b18-40f4-8d45-8117920e5d11",
  "connectionMetrics": {
    "inbound": {
      "consumed": {
        "success": {
          "PT1M": 0,
          "PT1H": 3558,
          "PT24H": 3558,
          "lastMessageAt": "2023-04-17T12:54:03.125Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "mapped": {
        "success": {
          "PT1M": 0,
          "PT1H": 3558,
          "PT24H": 3558,
          "lastMessageAt": "2023-04-17T12:54:03.461Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "dropped": {
        "success": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "enforced": {
        "success": {
          "PT1M": 0,
          "PT1H": 3558,
          "PT24H": 3558,
          "lastMessageAt": "2023-04-17T12:54:03.461Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "acknowledged": {
        "success": {
          "PT1M": 0,
          "PT1H": 3558,
          "PT24H": 3558,
          "lastMessageAt": "2023-04-17T12:54:03.508Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "throttled": {
        "success": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 2,
          "PT24H": 2,
          "lastMessageAt": "2023-04-17T12:54:03.461Z"
        }
      }
    },
    "outbound": {
      "dispatched": {
        "success": {
          "PT1M": 0,
          "PT1H": 3558,
          "PT24H": 3558,
          "lastMessageAt": "2023-04-17T12:54:03.507Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 904,
          "PT24H": 904,
          "lastMessageAt": "2023-04-17T12:54:03.509Z"
        }
      },
      "filtered": {
        "success": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "mapped": {
        "success": {
          "PT1M": 0,
          "PT1H": 2654,
          "PT24H": 2654,
          "lastMessageAt": "2023-04-17T12:54:03.883Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "dropped": {
        "success": {
          "PT1M": 0,
          "PT1H": 2654,
          "PT24H": 2654,
          "lastMessageAt": "2023-04-17T12:54:03.883Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "published": {
        "success": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "acknowledged": {
        "success": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      }
    }
  },
  "containsFailures": true,
  "sourceMetrics": {
    "addressMetrics": {
      "riego": {
        "consumed": {
          "success": {
            "PT1M": 0,
            "PT1H": 3558,
            "PT24H": 3558,
            "lastMessageAt": "2023-04-17T12:54:03.125Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "mapped": {
          "success": {
            "PT1M": 0,
            "PT1H": 3558,
            "PT24H": 3558,
            "lastMessageAt": "2023-04-17T12:54:03.461Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "dropped": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "enforced": {
          "success": {
            "PT1M": 0,
            "PT1H": 3558,
            "PT24H": 3558,
            "lastMessageAt": "2023-04-17T12:54:03.461Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "acknowledged": {
          "success": {
            "PT1M": 0,
            "PT1H": 3558,
            "PT24H": 3558,
            "lastMessageAt": "2023-04-17T12:54:03.508Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "throttled": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 2,
            "PT24H": 2,
            "lastMessageAt": "2023-04-17T12:54:03.461Z"
          }
        }
      }
    }
  },
  "targetMetrics": {
    "addressMetrics": {
      "_responses": {
        "dispatched": {
          "success": {
            "PT1M": 0,
            "PT1H": 3558,
            "PT24H": 3558,
            "lastMessageAt": "2023-04-17T12:54:03.507Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 904,
            "PT24H": 904,
            "lastMessageAt": "2023-04-17T12:54:03.509Z"
          }
        },
        "filtered": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "mapped": {
          "success": {
            "PT1M": 0,
            "PT1H": 2654,
            "PT24H": 2654,
            "lastMessageAt": "2023-04-17T12:54:03.883Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "dropped": {
          "success": {
            "PT1M": 0,
            "PT1H": 2654,
            "PT24H": 2654,
            "lastMessageAt": "2023-04-17T12:54:03.883Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "published": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "acknowledged": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        }
      }
    }
  }
}

Logs target connection: https://pastebin.com/VSApKH0g

Metrics target connection:

{
  "type": "connectivity.responses:retrieveConnectionMetrics",
  "status": 200,
  "connectionId": "bf7da601-b483-4069-b721-d2e9a456388e",
  "connectionMetrics": {
    "inbound": null,
    "outbound": {
      "dispatched": {
        "success": {
          "PT1M": 0,
          "PT1H": 3558,
          "PT24H": 3558,
          "lastMessageAt": "2023-04-17T12:54:03.480Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 2948,
          "PT24H": 2948,
          "lastMessageAt": "2023-04-17T12:54:03.484Z"
        }
      },
      "filtered": {
        "success": {
          "PT1M": 0,
          "PT1H": 3558,
          "PT24H": 3558,
          "lastMessageAt": "2023-04-17T12:54:03.480Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "mapped": {
        "success": {
          "PT1M": 0,
          "PT1H": 610,
          "PT24H": 610,
          "lastMessageAt": "2023-04-17T12:54:05.199Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "dropped": {
        "success": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "published": {
        "success": {
          "PT1M": 0,
          "PT1H": 610,
          "PT24H": 610,
          "lastMessageAt": "2023-04-17T12:54:05.250Z"
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      },
      "acknowledged": {
        "success": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        },
        "failure": {
          "PT1M": 0,
          "PT1H": 0,
          "PT24H": 0,
          "lastMessageAt": null
        }
      }
    }
  },
  "containsFailures": true,
  "sourceMetrics": {
    "addressMetrics": null
  },
  "targetMetrics": {
    "addressMetrics": {
      "opentwins2/{{ topic:channel }}/{{ topic:criterion }}/{{ thing:namespace }}/{{ thing:name }}": {
        "dispatched": {
          "success": {
            "PT1M": 0,
            "PT1H": 3558,
            "PT24H": 3558,
            "lastMessageAt": "2023-04-17T12:54:03.480Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "filtered": {
          "success": {
            "PT1M": 0,
            "PT1H": 3558,
            "PT24H": 3558,
            "lastMessageAt": "2023-04-17T12:54:03.480Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "mapped": {
          "success": {
            "PT1M": 0,
            "PT1H": 610,
            "PT24H": 610,
            "lastMessageAt": "2023-04-17T12:54:05.199Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "dropped": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "published": {
          "success": {
            "PT1M": 0,
            "PT1H": 610,
            "PT24H": 610,
            "lastMessageAt": "2023-04-17T12:54:05.250Z"
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "acknowledged": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        }
      },
      "_responses": {
        "dispatched": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 2948,
            "PT24H": 2948,
            "lastMessageAt": "2023-04-17T12:54:03.484Z"
          }
        },
        "filtered": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "mapped": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "dropped": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "published": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        },
        "acknowledged": {
          "success": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          },
          "failure": {
            "PT1M": 0,
            "PT1H": 0,
            "PT24H": 0,
            "lastMessageAt": null
          }
        }
      }
    }
  }
}

values.yaml

connectivity:
  extraEnv:
    - name: MQTT_CONSUMER_THROTTLING_ENABLED
      value: "false"
    - name: MQTT_CONSUMER_THROTTLING_LIMIT
      value: "100000"
    - name: KAFKA_CONSUMER_THROTTLING_ENABLED
      value: "false"
    - name: KAFKA_CONSUMER_THROTTLING_LIMIT 
      value: "100000"
    - name: CONNECTIVITY_MQTT_MAX_QUEUE_SIZE 
      value: "100000"
    - name: CONNECTIVITY_KAFKA_MAX_QUEUE_SIZE 
      value: "100000"
  resources:
    requests:
      cpu: 200m
    limits:
      memory: "2Gi"
gateway:
  resources:
    requests:
      cpu: 200m
    limits:
      memory: "768Mi"
nginx:
  service:
    type: NodePort
    nodePort: 30525
  resources:
    requests:
      cpu: 50m
    limits:
      cpu: 150m
      memory: "32Mi"
policies:
  resources:
    requests:
      cpu: 200m
    limits:
      memory: "768Mi"
swaggerui:
  enabled: false
things:
  resources:
    requests:
      cpu: 200m
    limits:
      memory: "768Mi"
thingsSearch:
  resources:
    requests:
      cpu: 200m
    limits:
      memory: "768Mi"
mongodb:
  enabled: false

Thanks in advance!

1

There are 1 best solutions below

0
On BEST ANSWER

That is a very well written question, thanks for the many inputs.

I think the problem you have is not solvable with the parameters you adjusted in the helm environment variables. The dropping occurs because the target connection's "message mapping" (built-in, not a custom JavaScript one) is "too slow" to catch up with the amount of processed changes.
Or that the "extra" fields retrieval is too slow - this basically also puts a lot of additional messages to the Ditto cluster.

The buffer size of this "message mapping" can be adjusted by e.g. configuring the environment variable CONNECTIVITY_SIGNAL_ENRICHMENT_BUFFER_SIZE (defaulting to 500).
What you probably need is to parallelise the outbound mapping, configuring the processorPoolSize of the connection you create - the default should be 5.
You would also need to increase CONNECTIVITY_MESSAGE_MAPPING_MAX_POOL_SIZE environment variable to the same value (e.g. 10).

However, what probably is required, is to scale your Ditto services in a better way. The memory you configure for e.g. the things service is way too low for what you are doing. Did you check if "things" e.g. crashes or consumes a lot of CPU?

And I would scale connectivity to at least 3 instances and configure the clientCount of your connections to 3 (default is 1).

The other option you have is to "couple" the outbound MQTT connection with the inbound Kafka connection requesting ACKs at the source connection and issuing the requested ACKs automatically at the target connection.
That coupling enables that the target connection can throttle the source connection using backpressure - which should lead to that no messages being dropped.
At least when configured correctly.

You seem to also have found a bug, as for the "target" connection, metrics are reported to be in category "_responses" - which should only happen for connections with a source - I will create an issue for that in the project's GitHub repo.