API Reference

Summary

Client([client_id, protocol, username, ...])

Low-level MQTT client

ClientOptions([max_inflight_messages, ...])

Class of optional settings for an MQTT client

ConnackCode(value[, names, module, ...])

MQTT Connection Acknowledgement codes

DBirth(timestamp, seq, metrics)

Class representing a DBirth payload

DCmd(timestamp, metrics)

Class representing a DCmd payload

DData(timestamp, seq, metrics)

Class representing a DData payload

DDeath(timestamp, seq)

Class representing a DDeath payload

DataType(value[, names, module, qualname, ...])

Enumeration of Sparkplug B datatypes

Device(device_id, metrics[, cmd_callback])

Class representing a Device in Sparkplug B

EdgeNode(group_id, edge_node_id, metrics[, ...])

Class representing an EdgeNode in Sparkplug B

ErrorCode(value[, names, module, qualname, ...])

MQTT error codes

MQTTError

Error from MQTT client

MQTTProtocol(value[, names, module, ...])

MQTT protocol enum

MULTI_LEVEL_WILDCARD

Constant for multi-level MQTT topic wildcard

Message(topic, payload, qos, retain)

Class representing a Sparkplug B message

MessageType(value[, names, module, ...])

Sparkplug B message type enum

Metric(timestamp, name, datatype[, value, ...])

Class representing a Sparkplug B metric

MetricValue

Type annotation for the types a Metric's value attribute can take

NBirth(timestamp, seq, metrics)

Class representing an NBirth payload

NCmd(timestamp, metrics)

Class representing an NCmd payload

NData(timestamp, seq, metrics)

Class representing an NData payload

NDeath(timestamp, bd_seq_metric)

Class representing an NDeath payload

QoS(value[, names, module, qualname, type, ...])

MQTT quality of service enum

SINGLE_LEVEL_WILDCARD

Constant for single-level MQTT topic wildcard

State(timestamp, online)

Class representing a State payload

TLSConfig([ca_certs, certfile, keyfile, ...])

TLS configuration class

Topic([group_id, message_type, ...])

Class representing a Sparkplug B topic

Transport(value[, names, module, qualname, ...])

MQTT transport enum

WSConfig([path, headers])

Websockets configuration class

get_current_timestamp()

Returns current time in a Sparkplug B compliant format

Interfaces

class pysparkplug.Client(client_id=None, protocol=MQTTProtocol.MQTT_V311, username=None, password=None, transport_config=None, client_options=ClientOptions(max_inflight_messages=20, max_queued_messages=0, message_retry_timeout=5, reconnection_delay_min=1, reconnection_delay_max=120, reconnect_on_failure=True))[source]

Low-level MQTT client

Parameters:
  • client_id (str | None) – the unique client id string used when connecting to the broker

  • protocol (MQTTProtocol) – the version of the MQTT protocol to use for this client

  • username (str | None) – the username used for broker authentication

  • password (str | None) – the password used for broker authentication

  • transport_config (TLSConfig | WSConfig | None) – a config object defining the transport layer protocol the client will use to connect to the broker

  • client_options (ClientOptions) – a config object defining various options for the client

connect(host, *, port=1883, keepalive=60, bind_address='', blocking=False, callback=None)[source]

Connect client to the broker

Parameters:
  • host (str) – the hostname or IP address of the remote broker

  • port (int) – the port of the broker

  • keepalive (int) – maximum period in seconds allowed between communications with the broker

  • bind_address (str) – the IP address of a local network interface to bind this client to, assuming multiple interfaces exist

  • blocking (bool) – whether or not to connect in a blocking way, or connect with a separate thread

  • callback (Callable[[Self], None] | None) – a custom callback to be called each time the client successfully connects

Return type:

None

disconnect()[source]

Disconnect from the broker cleanly, i.e. results in no will message being sent by the broker.

Return type:

None

publish(message, *, include_dtypes=False)[source]

Publish a message to the broker

Parameters:
  • message (Message) – the message to be published

  • include_dtypes (bool) – whether or not to include the dtypes of the message

Return type:

None

set_will(message)[source]

Set the last will & testament for the specified message

Parameters:

message (Message | None) – the message to be registered with the broker, or None, which clears the will

Return type:

None

subscribe(topic, qos, callback)[source]

Subscribe to the specified topic

Parameters:
  • topic (Topic) – the topic to be subscribed to

  • qos (QoS) – the qos of the subscription

  • callback (Callable[[Self, Message], None]) – the callback to run when messages are received for this subscription

Return type:

None

unsubscribe(topic)[source]

Unsubscribe from the specified topic

Parameters:

topic (Topic) – the topic to be subscribed to

Return type:

None

class pysparkplug.EdgeNode(group_id, edge_node_id, metrics, client=None, cmd_callback=<function _default_cmd_callback>)[source]

Class representing an EdgeNode in Sparkplug B

Parameters:
  • group_id (str) – the Group ID element of the topic namespace provides for a logical grouping of Sparkplug Edge Nodes into the MQTT Server and back out to the consuming Sparkplug Host Applications

  • edge_node_id (str) – the edge_node_id element of the Sparkplug topic namespace uniquely identifies the Sparkplug Edge Node within the infrastructure

  • metrics (Iterable[Metric]) – the metrics associated with this edge node

  • client (Optional[Client]) – the low-level MQTT client used by this edge node for connecting to the broker

  • cmd_callback (Callable[[Self, Message], None]) – the callback function to execute when an NCMD payload is received

connect(host, *, port=1883, keepalive=60, bind_address='', blocking=False)[source]

Connect edge node to the broker

Parameters:
  • host (str) – the hostname or IP address of the remote broker

  • port (int) – the port of the broker

  • keepalive (int) – maximum period in seconds allowed between communications with the broker

  • bind_address (str) – the IP address of a local network interface to bind this client to, assuming multiple interfaces exist

  • blocking (bool) – whether or not to connect in a blocking way, or connect with a separate thread

Return type:

None

deregister(device_id)[source]

Remove a device from the edge node, sending a DDeath if the edge node is online.

Parameters:

device_id (str) – the id of the device to be deregistered

Return type:

None

property devices: Dict[str, Device][source]

Returns a copy of the devices for this edge node in a dictionary

disconnect()[source]

Disconnect from the broker cleanly, i.e. results in no will message being sent by the broker.

Return type:

None

property metrics: Dict[str, Metric][source]

Returns a copy of the metrics for this edge node in a dictionary

register(device)[source]

Register a device to the edge node, can be run while edge node is connected

Parameters:

device (Device) – the device to register to the edge node

Return type:

None

subscribe(topic, qos, callback)[source]

Subscribe to the specified topic

Parameters:
  • topic (Topic) – the topic to be subscribed to

  • qos (QoS) – the qos of the subscription

  • callback (Callable[[Self, Message], None]) – the callback to run when messages are received for this subscription

Return type:

None

unsubscribe(topic)[source]

Unsubscribe from the specified topic

Parameters:

topic (Topic) – the topic to be subscribed to

Return type:

None

update(metrics)[source]

Update some (or all) of the edge node’s metrics

Parameters:

metrics (Iterable[Metric]) – an iterable of metrics to be updated

Return type:

None

update_device(device_id, metrics)[source]

Update some (or all) of the metrics associated with the provided device_id

Parameters:
  • device_id (str) – the id of the device to be updated

  • metrics (Iterable[Metric]) – an iterable of metrics to be updated

Return type:

None

class pysparkplug.Device(device_id, metrics, cmd_callback=<function _default_cmd_callback>)[source]

Class representing a Device in Sparkplug B

Parameters:
  • device_id (str) – the device_id element of the Sparkplug topic namespace identifies a device attached (physically or logically) to the Sparkplug Edge Node

  • metrics (Iterable[Metric]) – the metrics associated with this device

  • cmd_callback (Callable[[EdgeNode, Message], None]) – the callback function to execute when a DCMD payload is received

property metrics: Dict[str, Metric][source]

Returns a copy of the metrics for this edge node in a dictionary

update(metrics)[source]

Update some (or all) of the device’s metrics

Parameters:

metrics (Iterable[Metric]) – an iterable of metrics to be updated

Return type:

None

Nuts & Bolts

class pysparkplug.Message(topic, payload, qos, retain)[source]

Class representing a Sparkplug B message

Parameters:
  • topic (Topic) – the Sparkplug B topic associated with this message

  • payload (Payload) – the Sparkplug B payload associated with this message

  • qos (QoS) – the MQTT quality of service associated with this message

  • retain (bool) – if set to True, the message will be set as the “last known good”/retained message for the topic

classmethod from_mqtt_message(mqtt_message, *, birth=None)[source]

Constructs a Message object from a Paho MQTTMessage object

Parameters:
  • mqtt_message (MQTTMessage) – the Paho MQTTMessage object to construct from

  • birth (Birth | None) – the Birth object associated with this message, for decoding aliases and dropped dtypes

Return type:

Self

class pysparkplug.Topic(group_id=None, message_type=None, edge_node_id=None, device_id=None, sparkplug_host_id=None)[source]

Class representing a Sparkplug B topic

Parameters:
  • group_id (str | None) – the Group ID element of the topic namespace provides for a logical grouping of Sparkplug Edge Nodes into the MQTT Server and back out to the consuming Sparkplug Host Applications

  • message_type (MessageType | Literal['+'] | ~typing.Literal['#'] | None) – the message_type element of the topic namespace provides an indication as to how to handle the MQTT payload of the message

  • edge_node_id (str | None) – the edge_node_id element of the Sparkplug topic namespace uniquely identifies the Sparkplug Edge Node within the infrastructure

  • device_id (str | None) – the device_id element of the Sparkplug topic namespace identifies a device attached (physically or logically) to the Sparkplug Edge Node

  • sparkplug_host_id (str | None) – the unique identifier of the Sparkplug Host Application

classmethod from_str(topic)[source]

Construct a Topic object from a topic string

Parameters:

topic (str) – the Sparkplug B topic in raw string form

Returns:

a Topic object

Return type:

Self

class pysparkplug.Metric(timestamp, name, datatype, value=None, alias=None, is_historical=False, is_transient=False, is_null=False)[source]

Class representing a Sparkplug B metric

Parameters:
  • timestamp (int | None) – timestamp associated with data acquisition time

  • name (str | None) – name associated with this metric

  • datatype (DataType) – datatype associated with this metric

  • value (int | float | bool | str | bytes | datetime | None) – the value of the metric

  • alias (int | None) – an integer used to map to the metric’s name

  • is_historical (bool) – if this is historical data and should not update real time tag

  • is_transient (bool) – tells consuming clients such as MQTT Engine to not store this as a tag

  • is_null (bool) – if this is null - explicitly say so rather than using -1, false, etc

classmethod from_pb(metric)[source]

Constructs a Metric object from a Protobuf metric

Parameters:

metric (Metric) – the Protobuf metric to construct from

Returns:

a Metric object

Return type:

Self

to_pb(include_dtype)[source]

Returns a Protobuf metric

Parameters:

include_dtype (bool) – whether or not to include dtypes in the Protobuf metric

Return type:

Metric

Payloads

class pysparkplug.NBirth(timestamp, seq, metrics)[source]

Class representing an NBirth payload

Parameters:
  • timestamp (int) – timestamp at message sending time

  • seq (int) – sequence number

  • metrics (Iterable[Metric]) – metrics associated with this payload

classmethod decode(raw, *, birth=None)[source]

Construct a Birth object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a Birth object

  • birth (Birth | None) – unused input since Births payloads are self-contained

Returns:

Birth object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode Birth object into bytes

Parameters:

include_dtypes (bool) – whether or not to include dtypes

Returns:

encoded payload in bytes

Return type:

bytes

get_dtype(name)[source]

Get the dtype of the metric with the requested name

Parameters:

name (str) – the name of the metric we want the dtype of

Returns:

the dtype of the metric

Return type:

DataType

get_name(alias)[source]

Get the name of the metric with the requested alias

Parameters:

alias (int) – the alias of the metric we want the name of

Returns:

the name of the metric

Return type:

str

class pysparkplug.DBirth(timestamp, seq, metrics)[source]

Class representing a DBirth payload

Parameters:
  • timestamp (int) – timestamp at message sending time

  • seq (int) – sequence number

  • metrics (Iterable[Metric]) – metrics associated with this payload

classmethod decode(raw, *, birth=None)[source]

Construct a Birth object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a Birth object

  • birth (Birth | None) – unused input since Births payloads are self-contained

Returns:

Birth object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode Birth object into bytes

Parameters:

include_dtypes (bool) – whether or not to include dtypes

Returns:

encoded payload in bytes

Return type:

bytes

get_dtype(name)[source]

Get the dtype of the metric with the requested name

Parameters:

name (str) – the name of the metric we want the dtype of

Returns:

the dtype of the metric

Return type:

DataType

get_name(alias)[source]

Get the name of the metric with the requested alias

Parameters:

alias (int) – the alias of the metric we want the name of

Returns:

the name of the metric

Return type:

str

class pysparkplug.NData(timestamp, seq, metrics)[source]

Class representing an NData payload

Parameters:
  • timestamp (int) – timestamp at message sending time

  • seq (int) – sequence number

  • metrics (Iterable[Metric]) – metrics associated with this payload

classmethod decode(raw, *, birth=None)[source]

Construct a Payload object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a Payload object

  • birth (Birth | None) – the Birth object associated with this message, for decoding aliases and dropped dtypes

Returns:

Payload object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode Payload object into bytes

Parameters:

include_dtypes (bool) – whether or not to include dtypes

Returns:

encoded payload in bytes

Return type:

bytes

class pysparkplug.DData(timestamp, seq, metrics)[source]

Class representing a DData payload

Parameters:
  • timestamp (int) – timestamp at message sending time

  • seq (int) – sequence number

  • metrics (Iterable[Metric]) – metrics associated with this payload

classmethod decode(raw, *, birth=None)[source]

Construct a Payload object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a Payload object

  • birth (Birth | None) – the Birth object associated with this message, for decoding aliases and dropped dtypes

Returns:

Payload object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode Payload object into bytes

Parameters:

include_dtypes (bool) – whether or not to include dtypes

Returns:

encoded payload in bytes

Return type:

bytes

class pysparkplug.NCmd(timestamp, metrics)[source]

Class representing an NCmd payload

Parameters:
  • timestamp (int) – timestamp at message sending time

  • metrics (Iterable[Metric]) – metrics associated with this payload

classmethod decode(raw, *, birth=None)[source]

Construct a Payload object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a Payload object

  • birth (Birth | None) – the Birth object associated with this message, for decoding aliases and dropped dtypes

Returns:

Payload object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode Payload object into bytes

Parameters:

include_dtypes (bool) – whether or not to include dtypes

Returns:

encoded payload in bytes

Return type:

bytes

class pysparkplug.DCmd(timestamp, metrics)[source]

Class representing a DCmd payload

Parameters:
  • timestamp (int) – timestamp at message sending time

  • metrics (Iterable[Metric]) – metrics associated with this payload

classmethod decode(raw, *, birth=None)[source]

Construct a Payload object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a Payload object

  • birth (Birth | None) – the Birth object associated with this message, for decoding aliases and dropped dtypes

Returns:

Payload object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode Payload object into bytes

Parameters:

include_dtypes (bool) – whether or not to include dtypes

Returns:

encoded payload in bytes

Return type:

bytes

class pysparkplug.NDeath(timestamp, bd_seq_metric)[source]

Class representing an NDeath payload

Parameters:
  • timestamp (int | None) – timestamp at message sending time

  • bd_seq_metric (Metric) – birth death sequence number metric

classmethod decode(raw, *, birth=None)[source]

Construct an NDeath object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a NDeath object

  • birth (Birth | None) – unused input since NDeaths don’t have any metrics with aliases or dropped dtypes

Returns:

NDeath object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode NDeath object into bytes

Parameters:

include_dtypes (bool) – whether or not to include dtypes

Returns:

encoded payload in bytes

Return type:

bytes

class pysparkplug.DDeath(timestamp, seq)[source]

Class representing a DDeath payload

Parameters:
  • timestamp (int) – timestamp at message sending time

  • seq (int) – sequence number

classmethod decode(raw, *, birth=None)[source]

Construct a DDeath object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a DDeath object

  • birth (Birth | None) – unused input since DDeaths don’t have any metrics

Returns:

DDeath object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode DDeath object into bytes

Parameters:

include_dtypes (bool) – unused input since DDeaths have no metrics

Returns:

encoded payload in bytes

Return type:

bytes

class pysparkplug.State(timestamp, online)[source]

Class representing a State payload

Parameters:
  • timestamp (int) – timestamp at message sending time

  • online (bool) – whether or not the primary host application is online

classmethod decode(raw, *, birth=None)[source]

Construct a State object from bytes

Parameters:
  • raw (bytes) – bytes to decode into a Payload object

  • birth (Birth | None) – unused input since States don’t have any metrics

Returns:

State object

Return type:

Self

encode(*, include_dtypes=False)[source]

Encode State object into bytes

Parameters:

include_dtypes (bool) – unused input since States have no metrics

Returns:

encoded payload in bytes

Return type:

bytes

Config Classes

class pysparkplug.ClientOptions(max_inflight_messages=20, max_queued_messages=0, message_retry_timeout=5, reconnection_delay_min=1, reconnection_delay_max=120, reconnect_on_failure=True)[source]

Class of optional settings for an MQTT client

Parameters:
  • max_inflight_messages (int) – maximum number of messages with QoS>0 that can be part way through their network flow at once. Increasing this value will consume more memory but can increase throughput.

  • max_queued_messages (int) – maximum number of outgoing messages with QoS>0 that can be pending in the outgoing message queue. 0 means unlimited, but due to implementation currently limited to 65555 (65535 messages in queue + 20 in flight). When the queue is full, any further outgoing messages would be dropped.

  • message_retry_timeout (int) – time in seconds before a message with QoS>0 is retried, if the broker does not respond. This is set to 5 seconds by default and should not normally need changing.

  • reconnection_delay_min (int) – when the connection is lost, the client will automatically retry connection. Initially, the attempt is delayed of min_delay seconds. It’s doubled between subsequent attempts up to reconnection_delay_max.

  • reconnection_delay_max (int) – see reconnection_delay_min.

  • reconnect_on_failure (bool) – whether or not to reconnect the client on failure

Returns:

a ClientOptions object

class pysparkplug.TLSConfig(ca_certs=None, certfile=None, keyfile=None, cert_reqs=VerifyMode.CERT_REQUIRED, tls_version=_SSLMethod.PROTOCOL_TLS, ciphers=None)[source]

TLS configuration class

Parameters:
  • ca_certs (str | None) – a string path to the Certificate Authority certificate files that are to be treated as trusted by this client. If this is the only option given then the client will operate in a similar manner to a web browser. That is to say it will require the broker to have a certificate signed by the Certificate Authorities in ca_certs and will communicate using TLS v1.2, but will not attempt any form of authentication. This provides basic network encryption but may not be sufficient depending on how the broker is configured.

  • certfile (str | None) – string pointing to the PEM encoded client certificate. If this argument is not None then it will be used as client information for TLS based authentication. Support for this feature is broker dependent. Note that if this file is encrypted and needs a password to decrypt it, Python will ask for the password at the command line. It is not currently possible to define a callback to provide the password.

  • keyfile (str | None) – string pointing to the PEM encoded private keys. If this argument is not None then it will be used as client information for TLS based authentication. Support for this feature is broker dependent. Note that if this file is encrypted and needs a password to decrypt it, Python will ask for the password at the command line. It is not currently possible to define a callback to provide the password.

  • cert_reqs (VerifyMode) – defines the certificate requirements that the client imposes on the broker. By default this is ssl.CERT_REQUIRED, which means that the broker must provide a certificate. See the ssl pydoc for more information on this parameter.

  • tls_version (_SSLMethod) – specifies the version of the SSL/TLS protocol to be used. By default (if the python version supports it) the highest TLS version is detected. If unavailable, TLS v1.2 is used. Previous versions (all versions beginning with SSL) are possible but not recommended due to possible security problems.

  • ciphers (str | None) – a string specifying which encryption ciphers are allowable for this connection, or None to use the defaults. See the ssl pydoc for more information.

Returns:

a TLSConfig object

class pysparkplug.WSConfig(path='/mqtt', headers=None)[source]

Websockets configuration class

Parameters:
  • path (str) – the mqtt path to use on the broker.

  • headers (Dict[str, Any] | Callable[[Dict[str, Any]], Dict[str, Any]] | None) – either a dictionary specifying a list of extra headers which should be appended to the standard websocket headers, or a callable that takes the normal websocket headers and returns a new dictionary with a set of headers to connect to the broker.

Returns:

a WSConfig object

Enums

class pysparkplug.ConnackCode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

MQTT Connection Acknowledgement codes

class pysparkplug.DataType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Enumeration of Sparkplug B datatypes

decode(value)[source]

Decode a value from the form it takes in a Sparkplug B Protobuf object

Parameters:

value (int | float | bool | str | bytes)

Return type:

int | float | bool | str | bytes | datetime

encode(value)[source]

Encode a value into the form it should take in a Sparkplug B Protobuf object

Parameters:

value (int | float | bool | str | bytes | datetime)

Return type:

int | float | bool | str | bytes

property field: str[source]

The Protobuf field the data is encoded in

class pysparkplug.ErrorCode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

MQTT error codes

class pysparkplug.MessageType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Sparkplug B message type enum

property payload: type[source]

Returns the payload class for this message type

class pysparkplug.MQTTProtocol(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

MQTT protocol enum

class pysparkplug.QoS(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

MQTT quality of service enum

class pysparkplug.Transport(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

MQTT transport enum

Odds & Ends

pysparkplug.get_current_timestamp()[source]

Returns current time in a Sparkplug B compliant format

Return type:

int

pysparkplug.MetricValue[source]

Type annotation for the types a Metric’s value attribute can take

exception pysparkplug.MQTTError[source]

Error from MQTT client

pysparkplug.SINGLE_LEVEL_WILDCARD = '+'[source]

Constant for single-level MQTT topic wildcard

Uses type annotation for compatibility with Topic objects.

Added in version 0.2.0.

pysparkplug.MULTI_LEVEL_WILDCARD = '#'[source]

Constant for multi-level MQTT topic wildcard

Uses type annotation for compatibility with Topic objects.

Added in version 0.2.0.