1. Introduction
This is the Prosys OPC UA SDK for Java tutorial for using PubSub features. By reading this tutorial you should be able to implement your own PubSub applications, utilizing the features provided by the SDK. The functionality is defined in the OPC UA Specification 1.04 Part 14 "PubSub"
The PubSub Edition has been available since the SDK version 4.5.0. There are still several PubSub features that are not supported. If you are interested in any specific feature or have other types of inquiries, please contact the Prosys team and we’ll see how we can help you. |
PubSub consists of two parties. A Publisher, that will send data and a Subscriber, that will receive it. Applications can work both as a Publisher and a Subscriber. Neither one per se has to be specifically an OPC UA Application, though typically at least one side is.
The basic premise in the OPC UA Specification Part 14 is that the Publisher or Subscriber would be an OPC UA Server and the address space of the server is the source and/or target of the data. In the context of our SDK, the same applies to the connected server when the Publisher or Subscriber would be an OPC UA Client. You can also receive the data via an event-system and ignore defining any target node.
The OPC UA PubSub transport configuration consists of Transport Protocol and Message Encoding.
For Transport Protocol OPC UA PubSub defines
-
Datagram Transport Protocol, which is mapped to OPC UA UDP and OPC UA Ethernet
-
Broker Transport Protocol, which is mapped to MQTT and AMQP
The Prosys OPC UA SDK for Java support currently the following transport facets:
-
PubSub UDP UADP (Publisher and Subscriber)
-
PubSub MQTT UADP (Publisher and Subscriber)
-
PubSub MQTT JSON (Publisher and Subscriber)
Publishing event-datasets is not supported with UaClient-contexted PubSub yet. |
UADP (UA Datagram Protocol) is the binary message mapping for PubSub. JSON is an alternative clear-text message format.
UDP transport is mainly targeting controller-to-controller communication using UDP multicast or unicast transport. It works with UADP messages (therefore UDP UADP).
MQTT transport is mostly targeting communication to cloud systems. The SDK supports both UADP and JSON messages with MQTT. The supported MQTT versions are MQTT 3.1 and 3.1.1. The MQTT transport is typically going from the Publisher to a Broker, which then delivers it to the subscribers. OPC UA defines both a reversible and non-reversible JSON encoding format. The latter is easier to use by generic MQTT subscribers, whereas the former includes more OPC UA meta data.
AMQP transport is not implemented.
1.1. Security
There are 2 different security levels in PubSub. For UDP-UADP and MQTT-UADP there is security defined for UADP in the specification. This is not yet supported by the Prosys OPC UA SDK for Java. There is separate security for MQTT in the form of SSL connections to the Broker. This is supported by the SDK (see SampleBroker + SSL).
2. Installation
You should have a PubSub capable edition of the SDK in order to use PubSub. You can use one of the following distributions:
-
prosys-opc-ua-sdk-for-java-VERSION-evaluation.zip/tar.gz
-
prosys-opc-ua-sdk-for-java-VERSION-client-server-pubsub-binary.zip/tar.gz
-
prosys-opc-ua-sdk-for-java-VERSION-client-server-pubsub-source.zip/tar.gz
Please, contact Prosys OPC support or sales to get one, if you don’t have it yet.
3. Sample Applications
PubSub editions include sample applications that demonstrate how the PubSub SDK is used in practice:
-
SamplePublisherServer, a sample OPC UA Server that also operates as a PubSub Publisher.
-
SamplePublisherClient, a sample OPC UA Client that also operates as a PubSub Publisher.
-
SampleSubscriberServer, a sample OPC UA Server that also operates as a PubSub Subscriber.
-
SampleSubscriberClient, a sample OPC UA Client that also operates as a PubSub Subscriber.
-
SampleBroker, a simple MQTT Broker in order to show case the PubSub MQTT transport protocol mappings.
The SamplePublisherServer publishes data changes of the MyLevel variable and MyLevelAlarm events from MyDevice node of the server. The Subscribers receive these values and you can see on the console how the value of MyLevel is changing and when events with message "Level exceeded" have been sent.
SamplePublisherClient is monitoring MyLevel on an OPC UA Server (SampleConsoleServer or SamplePublisherServer, for example) and publishes that as an OPC UA Publisher forward.
SampleSubscriberServer is showing how to synchronize data between two servers. It is monitoring data changes to MyLevel as a Subscriber from the Publisher and updates MyPubSubTargetVariable in its own address space accordingly. This allows you to connect to the server with an OPC UA client to monitor these changes as well.
SampleSubscriberClient shows how to incorporate OPC UA Subscriber within an OPC UA Client. It is connecting to the SamplePublisherServer as a client and also monitors data changes and events as a Subscriber just to display the changes.
By default the samples use UDP transport with UADP messages. You can use SampleBroker to run the samples with MQTT. Alternatively to UADP, you can also use JSON messages with MQTT.
The samples are used similar to the other samples in the SDK. You can compile and start them using the included script files (.bat/.sh) or use the pom.xml files for Maven integration.
Generally it is best to make a project within your IDE, for example, as explained in the Prosys_OPC_UA_SDK_for_Java_Starting_Guide, and run the applications from there on. |
-
To run a basic test with UDP transport and UADP messages, start both the SamplePublisherServer and SampleSubscriberServer and select a common Network Interface at start.
-
To run with MQTT transport, start the SampleBroker first. Then, start both the SamplePublisherServer and SampleSubscriberServer with the command line option '-a' (or '--address'):
samplepublisherserver.bat -a mqtt://localhost:1883
. -
To use JSON messages with MQTT use also the command line option '-e' ('--encoding'):
samplepublisherserver.bat -a mqtt://localhost:1883 -e json
.
An application can operate as both a Publisher and a Subscriber, but in the samples they are separated for ease of use. |
3.1. SampleBroker + SSL
The SampleBroker
can be run in "SSL mode", in which the connections to the broker are secured with SSL. This requires that certificates are generated for the Broker (the keystore
of the Broker). In addition it is possible to require client side authentication, i.e. that the client’s SSL certificate must be in the truststore
of the Broker.
The following CLI flags can be used to start `SampleBroker`in SSL mode with also requiring client side authentication:
-a "mqtt+ssl://localhost:8883?needClientAuth=true" --ssl --ssl-keystore "PATH_TO_KEYSTORE.jks" --ssl-keystore-store-password KEYSTORE_PASSWORD --ssl-keystore-keys-password BROKER_KEY_PASSWORD --ssl-truststore "PATH_TO_TRUSTSTORE.jks" --ssl-truststore-password TRUSTSTORE_PASSWORD
PATH_TO_KEYSTORE.jks must be a (preferably absolute) path to a java keystore which contais the certificate of the Broker, it typically is password protected, so the password is given with KEYSTORE_PASSWORD. The individual private key of the Broker within that store might have a different password, that is given with BROKER_KEY_PASSWORD.
PATH_TO_TRUSTSTORE.jks must be a (preferably absolute) path to a java keystore which contais the truststore of the Broker. With client side authentication required, this store must contain the public key of the client connecting to the Broker. The store is typically password protected, so the password is given with TRUSTSTORE_PASSWORD.
The SampleBroker is just a sample to show the PubSub with MQTT works. In a real application passwords should NEVER be passed in via command line (see e.g. https://www.netmeister.org/blog/passing-passwords.html). The arguments are here just used to have a way to use something else than the hardcoded default paths and passwords. A real application typically should use a configuration file or a secure storage option.
|
3.1.1. Generating the keystores and certificates
These are instructions that will work with the SampleBroker , please consult the documentation of your real Broker how these are done there.
|
These instructions assume the 'keytool' (from a JDK installation) and 'openssl' commands are available on your system. On non-windows platforms 'openssl' is typically available or easily installed via package managers. On windows one "easy place" is typically within a git bash terminal, assuming you are doing version control with 'Git'. Also depending on the version of the tools you have the process might be slightly different. |
The openssl commands listed below have the 'winpty' at the start, since this might be required if running within 'git bash' terminal on Windows. On other operating systems you can can run the commands without the ' winpty' at the start. |
This creates the private (+public) key of the Broker (note that it will ask some questions, see below for recommended answers for the purpose of this tutorial):
keytool -genkey -keyalg RSA -alias broker -keystore broker-keystore.jks -storepass password-keystore -validity 360 -keysize 2048 -ext san=dns:HOSTNAME,dns:localhost,ip:127.0.0.1
The san=… information must match the machine on which the SampleBroker will be run (i.e. what address the clients use to connect to it). Replace the HOSTNAME with the hostname of your machine.
|
-
What is your first and last name?
Answer the hostname of the machine as well.
-
What is the name of your organizational unit?
Can be be skipped (just press enter) for the purposes of this sample.
-
What is the name of your organization??
Can be be skipped (just press enter) for the purposes of this sample.
-
What is the name of your City or Locality?
Can be be skipped (just press enter) for the purposes of this sample.
-
What is the name of your State or Province?
Can be be skipped (just press enter) for the purposes of this sample.
-
What is the two-letter country code for this unit?
Use the country code of the country where the broker runs (authors are from Finland so 'FI' for us).
-
Is CN=HOSTNAME_HERE, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=COUNTRY_CODE_HERE correct?
Answer "yes". Note that some versions of the tool might not ask this.
-
Enter key password for <broker> (RETURN if same as keystore password)
Enter the password to be used for the individual key, for the purpose of this example we use "password-key". Then re-enter the same password. Note that on some keytool versions this question is not asked, then the password is the same for the key and the store.
Ignore the "The JKS keystore uses a proprietary format." warning.
For the client cert (that will connect to the Broker) we will re-use the OPC UA ApplicationInstanceCertificate of the samplesubscriberserver and samplepublisherserver for the purposes of this tutorial:
winpty openssl x509 -inform der -in "PATH_TO_SDK_UNZIPPED_DISTRIBUTION\samples\samplesubscriberserver\PKI\CA\private\SampleSubscriberServer@HOSTNAME_2048.der" -out mqtt-client-cert1.crt winpty openssl x509 -inform der -in "PATH_TO_SDK_UNZIPPED_DISTRIBUTION\samples\samplepublisherserver\PKI\CA\private\SamplePublisherServer@HOSTNAME_2048.der" -out mqtt-client-cert2.crt
Replace the HOSTNAME with the hostname of your machine |
Then they are imported to the truststore of the Broker:
keytool -import -file mqtt-client-cert1.crt -alias client1 -keystore broker-truststore.jks -storepass password-truststore keytool -import -file mqtt-client-cert2.crt -alias client2 -keystore broker-truststore.jks -storepass password-truststore
Answer to the "Trust this certificate? [no]:" with "yes" for both.
Now you should have the files broker-keystore.jks and broker-truststore.jks.
Next we will export the public key of the broker certificate, to be used in the client side (so that the client side trusts the Broker):
keytool -exportcert -alias broker -keystore broker-keystore.jks -rfc -file server.pem
(use the password-keystore for the keystore)
This server.pem
is used then as a CLI parameter to the PubSub samples that wish to connect to the Broker via MQTT+SSL via:
--ca-cert "PATH_TO_server.pem"
Note the parameter was named --ca-cert
since it most likely would be a CA certificate (which has been used to sign the keys of the Broker), but in this sample it is the public key directly.
Currently you must give the mqtt url in the format ssl:// instead of mqtt:// when giving the address to other PubSub samples i.e. MQTT connection Conf objects.
|
This server.pem is the public key of the broker, the .pem is just a file format, but it should be noted that for the rest of the SDK a .pem file has typically mean a private key (as they are store typically as such, while we have used .der for the public keys). |
4. Configuring PubSub
In this chapter we’ll go through the configuration of the PubSubSystem.
We strongly recommend you to investigate and play with the samples mentioned in Sample Applications, but most steps are explained also here.
The PubSub configuration is grouped into various methods in the class SamplePubSubConfiguration
. It also enables handling of command line options for the samples. But for most parts, the configuration is rather fixed. In your own applications, you can prepare the configuration to your needs using the various configuration objects that are explained in the below sections.
4.1. PubSubSystem
Everything in the PubSub functionality is defined under the main PubSubSystem
class.
The PubSubSystem is configured with PubSubSystemConf
, which contains separate configuration objects for the various parts. The configuration objects are immutable and they are created with the respective Builder objects (so PubSubSystemConf.Builder
in this case).
The complete configuration is done in SamplePubSubConfiguration.createPubSubSystem()
, which is called from the run
method of each sample application and we will break that method into pieces in the following chapters.
/**
* Creates a PubSubSystem with a sample configuration for a variable and event dataset
* corresponding to variableId and eventNotifierId.
*
* @param application UaServer or UaClient for which the PubSubSystem is created for
* @param pubSubMode {@link PubSubMode#PUBLISHER} or {@link PubSubMode#SUBSCRIBER}
* @param variableId a single variable to use for defining a Variable Dataset
* @param eventNotifierId an object to use for defining an event dataset. Note that event datasets
* are not available for client applications, yet.
* @param targetVariableId a target variable in the address space of an UaServer to which the
* variableId is mapped to
* @return The PubSubSystem to use with the application.
*/
public PubSubSystem createPubSubSystem(UaApplication application, PubSubMode pubSubMode, UaNodeId variableId,
UaNodeId eventNotifierId, UaNodeId targetVariableId) {
if (application instanceof UaClient) {
// The eventNotifierId and targetVariableId only work for UaServers
if (eventNotifierId != null) {
throw new IllegalArgumentException("Cannot initialize an event dataset for UaClient");
}
if (targetVariableId != null) {
throw new IllegalArgumentException("Cannot initialize targetVariable for UaClient");
}
}
/*
* The PubSubSystem is configured with PubSubSystemConf, which contains separate configuration
* objects for the various parts. The configuration objects are immutable and they are created
* with the respective Builder objects.
*/
PubSubSystemConf.Builder builder = PubSubSystemConf.builder();
// Ignore if loading from a file and not saving to one
if (loadFile == null || saveFile != null) {
// The DataSets define what is published
if (variableId != null) {
variableDataSet = addPublishedVariableDataSet(builder, variableId);
}
if (eventNotifierId != null) {
eventDataSet = addPublishedEventDataSet(builder, eventNotifierId);
}
// targetVariableId is used in createTargetVariables()
this.targetVariableId = targetVariableId;
// The connection defines how it is published
addConnection(builder, pubSubMode);
}
PubSubSystemConf pubSubSystemConf = builder.build();
// If save option used, persist the configuration to file
if (saveFile != null) {
try {
SamplePubSubConfiguration.save(saveFile, pubSubSystemConf, application.getEncoderContext());
} catch (Exception e) {
logger.error("Could not save PubSub configuration to file: {}", saveFile, e);
throw new RuntimeException("Could not save PubSub configuration");
}
}
// If load option used, replace the configuration from the file
if (loadFile != null) {
try {
pubSubSystemConf = SamplePubSubConfiguration.load(loadFile, application.getEncoderContext());
} catch (Exception e) {
logger.error("Could not load PubSub configuration from file: {}", loadFile, e);
throw new RuntimeException("Could not load PubSub configuration");
}
}
// MQTT requires special settings, which are currently provided via PubSubSystemOverrides
// Note, for the time being, SDK doesn't have built-in support for persisting these in a file.
PubSubSystemOverrides pubSubSystemOverrides = createMqttOverrides(application);
// The PubSubSystem is created in the context of an UaApplication (either UaClient or UaServer)
return PubSubSystem.createFor(application, pubSubSystemConf, pubSubSystemOverrides);
}
The following chapters will describe the different parts of the configuration that needs to be defined, including the DataSets, which should match in the Publisher and Subscriber applications and Connections, which must also match regarding the connection parameters, but which vary between the Publisher and Subscriber, regarding the respective Writer and Reader configurations.
Once the configuration is finished, the PubSubSystem is created for an UaServer (or UaClient) object with PubSubSystem.createFor
as seen above.
4.2. DataSets
The DataSets define what is published in a Publisher. Both data changes and events can be published. Although at the moment the client applications cannot publish events.
4.2.1. Variable DataSet
The Variable DataSet is used to configure a fixed set of variables that is published or subscribed to.
private PubSubVariableDataSetConf addPublishedVariableDataSet(PubSubSystemConf.Builder conf, UaNodeId variableId) {
PubSubVariableDataSetConf.Builder builder = PubSubVariableDataSetConf.builder();
// The Dataset is identified by it's unique name
builder.setName(SAMPLE_VARIABLE_DATASET_NAME);
/*
* By default, the dataset will monitor the Value attribute of the variable. It will by default
* use data from the node for the metadata of the Field and the name part of the BrowseName
* Attribute for the field's name.
*/
builder.addPublishedVariables(variableId);
// This example shows how to monitor the DisplayName attribute
PubSubPublishedVariableConf.Builder variableConfBuilder = PubSubPublishedVariableConf.builder();
variableConfBuilder.setPublishedVariable(variableId);
variableConfBuilder.setAttributeId(Attributes.DisplayName);
/*
* You can also override the name and description if the ones of the node are not suitable. For
* non-Value Attributes by default the name of the Attribute is appended to the name part of the
* BrowseName.
*/
variableConfBuilder.setName("MySecondPublishedField");
variableConfBuilder.setDescription(new LocalizedText("Description for second field"));
builder.addPublishedVariables(variableConfBuilder.build());
PubSubVariableDataSetConf r = builder.build();
conf.addOrReplaceDataSet(r);
return r;
}
The DataSet is identified by a unique name within the PubSubSystem.
The variables must be added to the DataSet using UaNodeId
. This is a bit different to the NodeId
that the Client/Server parts of the SDK has used. The main difference is that it is always configured with a NamespaceUri instead of NamespaceIndex. There are conversion methods between them, but in the samples we can simply create the value corresponding to our MyLevel variable as follows:
public static final String SAMPLE_NAMESPACE = "http://www.prosysopc.com/OPCUA/SampleAddressSpace";
UaNodeId variableId = UaNodeId.from(SAMPLE_NAMESPACE, "MyLevel");
By default, the DataSet will monitor changes in the Value attributes of the variables. If you wish to send notifications for other attributes, you can use the PubSubPublishedVariableConf
as seen above.
In addition to the AttributeId
, the PubSubPublishedVariableConf can be used to specify other details, such as the Deadband and SamplingIntervalHint, although these are not yet supported (will be added in a later version).
4.2.2. Event DataSet
To enable publishing OPC UA events, we will use a builder for PubSubEventDataSetConf:
private PubSubEventDataSetConf addPublishedEventDataSet(PubSubSystemConf.Builder conf, UaNodeId eventNotifierId) {
PubSubEventDataSetConf.Builder builder = PubSubEventDataSetConf.builder();
// The Dataset is identified by it's unique name
builder.setName(SAMPLE_EVENT_DATASET_NAME);
builder.setEventNotifier(eventNotifierId);
// The SelecteClauses define which event fields are added to the Dataset
builder.addSelectClauses(UaSimpleAttributeOperand
.from(UaBrowseNamePath.from(Ids.BaseEventType, UaQualifiedName.standard("Message")), Attributes.Value));
builder.addSelectClauses(UaSimpleAttributeOperand
.from(UaBrowseNamePath.from(Ids.BaseEventType, UaQualifiedName.standard("EventId")), Attributes.Value));
PubSubEventDataSetConf r = builder.build();
conf.addOrReplaceDataSet(r);
return r;
}
The server samples use the Server
object as the source events. We can define it’s UaNodeId as follows:
UaNodeId SERVER_NODE_ID = UaNodeId.fromStandard(ObjectIdentifiers.Server);
For more information about the concepts of specifying OPC UA Events, see Prosys_OPC_UA_SDK_for_Java_Client_Tutorial. |
4.3. Connection
The PubSub system must define at least one connection, via which the message are sent to the network. Each connection consists of the transport protocol (UDP or MQTT) and message encoding (UADP or JSON). There are separate connection configuration classes for each of the three supported transport facets (i.e. combinations of transport protocol and message encoding): PubSubMqttJsonConnectionConf
, PubSubMqttUadpConnectionConf
and PubSubUdpUadpConnectionConf
. The transport protocol and message encoding are combined as a PubSubTransportProfile
.
The PubSubTransportProfile
can server as a factory for creating Builder
(s) of that kind. The samples take the profile via command line arguments (or uses PubSubTransportProfile.UDP_UADP
by default). This is then used to create the Builder
for the connection, groups and reader/writer configuration objects.
private void addConnection(PubSubSystemConf.Builder conf, PubSubMode pubSubMode) {
// The defined PubSubTransportProfile serves as a factory to create profile-based
// implementations of Builders for the configuration objects
PubSubConnectionConf.Builder<?> builder = transportProfile.createConnectionConfBuilder();
builder.setName(SAMPLE_CONNECTION_NAME);
if (PubSubMode.PUBLISHER == pubSubMode) {
builder.setPublisherId(publisherId);
}
if (builder instanceof PubSubUdpUadpConnectionConf.Builder) {
PubSubUdpUadpConnectionConf.Builder b = (PubSubUdpUadpConnectionConf.Builder) builder;
// UDP specific settings
InetSocketAddress inetAddress = getInetAddress(pubSubAddress);
b.setNetworkAddress(inetAddress);
// Network Interface must be defined for UDP multicast connections
b.setNetworkInterface(networkInterface);
}
if (builder instanceof PubSubBrokerConnectionConf.Builder) {
PubSubBrokerConnectionConf.Builder<?> b = (PubSubBrokerConnectionConf.Builder<?>) builder;
// MQTT specific settings
b.setNetworkAddress(pubSubAddress);
}
if (PubSubMode.PUBLISHER == pubSubMode) {
addWriterGroup(builder);
}
if (PubSubMode.SUBSCRIBER == pubSubMode) {
addReaderGroup(builder);
}
conf.addOrReplaceConnection(builder.build());
}
The PublisherId
is defined only for a Publisher. The Subscribers can define it for the DataSet Reader, if they wish to use it for filtering incoming messages.
The PublisherId should be unique in the network so that the Subscribers can recognize the Publishers with them. The value can a String or unsigned number (Byte, UnsignedShort, UnsignedInteger or UnsignedLong). Null and 0 are not valid values.
The samples are actually using a default PublisherId, which includes the actual hostname of the computer where they are run. This enables identifying several publishers in the same network. You can also provide a custom ID with the command line option --publisher-id for the sample applications.
|
The connections also need to define the NetworkAddress
to send or listen to. The default UDP multicast address that we use is "opc.udp://224.0.5.1:4840". The address is given as InetSocketAddress
, see getInetAddress
method for more details. Note that for multicast addresses you must also define which NetworkInterface
is used (as the address doesn’t map uniquely to one). You can check out the method selectNetworkInterface
in the sample code to see how to choose from the available Network Interfaces.
For MQTT, you define the NetworkAddress as String
, corresponding to the address of the Broker. For example, if you run the SampleBroker in the same computer, you would use "mqtt://localhost:1883".
For MQTT+SSL, you must give the mqtt url in the format ssl:// instead of mqtt:// .
|
4.3.1. Discovery functionality
Discovery messages enable exchange of meta data between Publishers and Subscribers. PubSub defines discovery messages for both UADP and JSON message mappings. Currently the SDK supports only the DataSetMetaData discovery messages in UADP and JSON, Publisher Endpoint and DataSetWriter discovery are not yet supported.
DataSetMetaData messages contain information about the structure of the DataSet messages, helping Subscribers to decode the data they receive. For example, in section Variable DataSet we defined that our variable DataSet contains the Value and DisplayName attributes of ´MyLevel`. The Value is changing all the time, whereas the DisplayName typically stays constant. Messages sent by Publishers don’t need to send values if they are not changing. But when a binary mapped message contains only one value for a DataSet that includes two attributes there needs to be a way to determine which value it is. For this reason, the variable fields in a DataSet are indexed in the respective DataSetMetaData to distinguish them from each other.
In UADP the Publisher can be configured to send DataSetMetaData messages when the DataSetMetaData changes. In MQTT it can also be sent periodically. Subscribers can also request DataSetMetaData on demand from the Publisher. When using JSON message mapping with MQTT, the DataSetMetaData messages can be sent to their own queue on a broker. Additionally, as mentioned in DataSet Reader, a DataSetMetaData can be assigned manually to a Subscriber if the message structure is known.
By default, Discovery messages are sent to the same NetworkAddress that was specified with setNetworkAddress
and you don’t need to do anything special for the configuration.
4.4. Publisher
The Publisher specific configuration consists of Writer Groups and DataSet Writers.
4.4.1. Writer Group
Writer Groups write the Network Messages that are published by the Publisher. A single Network Message may contain one or more DataSet Messages from one or more DataSet Writers. Thus, for example, when we send a Network Message, it may contain data from both the VariableDataSet and EventDataSet that we configured.
The Writer Group runs in cycles of PublishingInterval. During each PublishingInterval, at SamplingOffset, data is first sampled from the configured DataSet Writers. Next, at the time specified by PublishingOffset, the sampled data is written into a single NetworkMessage together with the accompanying meta data and published.
Writer Groups are specific to the message encoding, so we have a separate initialization for UADP and JSON.
private void addWriterGroup(PubSubConnectionConf.Builder<?> connection) {
PubSubWriterGroupConf.Builder<?> builder = connection.getTransportProfile().createWriterGroupConfBuilder();
builder.setName(SAMPLE_WRITER_GROUP_NAME);
builder.setWriterGroupId(SAMPLE_WRITER_GROUP_ID);
builder.setPublishingInterval(Duration.ofMillis(publishingInterval));
builder.setKeepAliveTime(Duration.ofSeconds(120));
if (builder instanceof PubSubUadpWriterGroupConf.Builder) {
initUadpWriterGroup((PubSubUadpWriterGroupConf.Builder<?>) builder);
}
if (builder instanceof PubSubJsonWriterGroupConf.Builder) {
initJsonWriterGroup((PubSubJsonWriterGroupConf.Builder<?>) builder, reversibleJson);
}
if (builder instanceof PubSubBrokerTransportSettingsConf.Builder) {
initBrokerTransportSettings((PubSubBrokerTransportSettingsConf.Builder<?>) builder, queueName, metadataQueueName);
}
if (variableDataSet != null) {
addVariableDataSetWriter(builder);
}
if (eventDataSet != null) {
addEventDataSetWriter(builder);
}
connection.addOrReplaceGroup(builder.build());
}
And the init parts, which define specific settings for transport and message encoding.
private static void initUadpWriterGroup(PubSubUadpWriterGroupConf.Builder<?> builder) {
// Configuration of meta data included in a NetworkMessage
UadpNetworkMessageContentMask uadpNetworkMessageContentMask = UadpNetworkMessageContentMask.of(
UadpNetworkMessageContentMask.PayloadHeader, UadpNetworkMessageContentMask.SequenceNumber,
UadpNetworkMessageContentMask.DataSetClassId, UadpNetworkMessageContentMask.PublisherId,
UadpNetworkMessageContentMask.WriterGroupId, UadpNetworkMessageContentMask.Timestamp,
UadpNetworkMessageContentMask.GroupHeader, UadpNetworkMessageContentMask.NetworkMessageNumber);
builder.setMessageContentMask(uadpNetworkMessageContentMask);
// Set maximum Network Message size for written Network Messages. If message size exceeds the
// limit, it is chunked into several messages. Note that 65535 is the maximum size allowed for
// UDP connections.
builder.setMaxNetworkMessageSize(UnsignedInteger.valueOf(4096));
}
private static void initJsonWriterGroup(PubSubJsonWriterGroupConf.Builder<?> builder, boolean reversibleJson) {
// Configuration of meta data included in a NetworkMessage
// Let's not include the headers to keep it simple
JsonNetworkMessageContentMask jsonNetworkMessageContentMask =
reversibleJson
? JsonNetworkMessageContentMask.of(JsonNetworkMessageContentMask.DataSetMessageHeader,
JsonNetworkMessageContentMask.NetworkMessageHeader, JsonNetworkMessageContentMask.DataSetClassId,
JsonNetworkMessageContentMask.PublisherId)
: JsonNetworkMessageContentMask.of();
builder.setMessageContentMask(jsonNetworkMessageContentMask);
/*
* JSON messages don't need a limit, though this is mainly since they are only usable with MQTT.
*/
builder.setMaxNetworkMessageSize(UnsignedInteger.MAX_VALUE);
}
private static void initBrokerTransportSettings(PubSubBrokerTransportSettingsConf.Builder<?> builder,
final String queueName, final String metadataQueueName) {
// For MQTT connections we need to set the queueNames and some QoS parameters
/*
* IMPORTANT! For now the queue names are assumed to be non-wildcards.
*/
builder.setQueueName(queueName);
builder.setMetaDataQueueName(metadataQueueName);
builder.setRequestedDeliveryGuarantee(BrokerTransportQualityOfService.AtLeastOnce);
// Note that this is only needed in the Publisher side
builder.setMetaDataUpdateTime(Duration.ofSeconds(30));
}
WriterGroupId identifies the Writer Group within a Publisher and can be used by the Subscribers to filter the messages (with UADP).
PublishingInterval
defines the frequency of message dispatching.
In case no DataSet messages have been available for a time specified by KeepAliveTime
, an empty Network Message is sent to notify the Subscribers that the Publisher is still functional.
A Network Message can be configured to include meta data, which can then be used for filtering or identification in the Subscribers. For example, SequenceNumber helps Subscribers to follow the correct order of the messages.
The NetworkMessageContentMask is defined differently for UADP and JSON messages.
If the size of the UDP Network Message exceeds the configured MaxNetworkMessageSize, it will be split into several Network Messages that will be sent at specified PublishingOffsets. The maximum MaxNetworkMessageSize allowed for UDP UADP communication is 65535 bytes.
The specification recommends that MaxNetworkMessageSize should not exceed MTU minus the length of all the headers. But this may lead to message chunking and although the SDK supports it, make sure that the Subscribers also supports it, if you need to split messages. For now, we use 4096, which works fine for our samples. |
4.4.2. DataSet Writer
DataSet Writers specify how the DataSet messages are written. The configuration is similar to both Variable DataSets and Event DataSets that we created above, but again varies a bit depending on the encoding.
A single DataSet Writer always writes data from exactly one DataSet. The following example shows how to define a Variable DataSet Writer for UADP:
private void addVariableDataSetWriter(PubSubWriterGroupConf.Builder<?> group) {
PubSubDataSetWriterConf.Builder<?> builder = group.getTransportProfile().createDataSetWriterConfBuilder();
builder.setName(SAMPLE_VARIABLE_DATA_SET_WRITER_NAME);
builder.setDataSetWriterId(SAMPLE_VARIABLE_DATASET_WRITER_ID);
// Linking with the VariableDataSet that was created in createVariableDataSet
builder.setDataSetName(variableDataSet.getName());
/*
* To optimise message sizes, DataSetWriter writes DataSetMessages that contain only the changed
* values of its PublishedDataSet. These messages are Delta Frames. DataSetMessages that contain
* all values of the PublishedDataSet are Key Frames. Parameter KeyFrameCount defines the
* maximum amount of times the PublishingInterval expires before sending a new Key Frame
* Message. If set as 1, every DataSetMessage will be a Key Frame message.
*/
// Note that this parameter only has meaning for writers that write a variable dataset
builder.setKeyFrameCount(UnsignedInteger.valueOf(SAMPLE_KEYFRAME_COUNT));
if (builder instanceof PubSubUadpDataSetWriterConf.Builder) {
initUadpDataSetWriter((PubSubUadpDataSetWriterConf.Builder<?>) builder);
}
if (builder instanceof PubSubJsonDataSetWriterConf.Builder) {
initJsonDataSetWriter((PubSubJsonDataSetWriterConf.Builder<?>) builder, reversibleJson);
}
if (builder instanceof PubSubBrokerTransportSettingsConf.Builder) {
initBrokerTransportSettings((PubSubBrokerTransportSettingsConf.Builder<?>) builder, queueName, metadataQueueName);
}
group.addOrReplaceWriter(builder.build());
}
And the init methods for transport and encoding specifics:
private static void initUadpDataSetWriter(PubSubUadpDataSetWriterConf.Builder<?> builder) {
// Headers to include in the network messages
builder.setMessageContentMask(UadpDataSetMessageContentMask.Timestamp);
// DataSetFieldContentMask defines if we want to include meta information like StatusCode or
// SourceTimestamp for the published variables.
DataSetFieldContentMask fieldContentMask = DataSetFieldContentMask.of(DataSetFieldContentMask.StatusCode,
DataSetFieldContentMask.SourceTimestamp, DataSetFieldContentMask.ServerTimestamp);
builder.setFieldContentMask(fieldContentMask);
}
private static void initJsonDataSetWriter(PubSubJsonDataSetWriterConf.Builder<?> builder, boolean reversibleJson) {
// Headers to include in the network messages
builder.setMessageContentMask(JsonDataSetMessageContentMask.Timestamp);
// The fields to include in the dataset messages
// Included here since the value depends on the reversible json mode
DataSetFieldContentMask fieldContentMask = reversibleJson ? DataSetFieldContentMask.of()
: DataSetFieldContentMask.of(DataSetFieldContentMask.Fields.StatusCode,
DataSetFieldContentMask.Fields.SourceTimestamp);
builder.setFieldContentMask(fieldContentMask);
}
So the DataSetWriter also must have a unique name and ID.
When a DataSet message contains all the values of a Published DataSet, it is called a Key Frame message. When it contains only changes it is called a Delta Frame message. The KeyFrameCount parameter defines how often a Key Frame message is sent: if it is one, every message is a Key Frame, which can be optimal in some situations.
The DatsSetFieldContentMask
defines the value fields that are included in the messages.
For MQTT connections, QoS parameters and specific queues for DataSet and DataSetMetaData messages are set in initBrokerTransportSettings
.
A similar configuration can be defined for an EventDataSetWriter. See the method addEventDataSetWriter()
in the SamplePubSubConfiguration
.
4.5. Subscriber
The Subscriber specific configuration consists of Reader Groups and DataSet Readers.
4.5.1. Reader Group
A Reader Group reads Network Messages from a connection. A basic Reader Group is configured as follows:
private void addReaderGroup(PubSubConnectionConf.Builder<?> connection) {
PubSubReaderGroupConf.Builder<?> builder = connection.getTransportProfile().createReaderGroupConfBuilder();
builder.setName(SAMPLE_READER_GROUP_NAME);
if (builder instanceof PubSubJsonReaderGroupConf.Builder) {
initJsonReaderGroup((PubSubJsonReaderGroupConf.Builder<?>) builder);
}
if (builder instanceof PubSubUadpReaderGroupConf.Builder) {
initUadpReaderGroup((PubSubUadpReaderGroupConf.Builder<?>) builder);
}
addVariableDataSetReader(builder);
addEventDataSetReader(builder);
connection.addOrReplaceGroup(builder.build());
}
And the init parts for transport and message encoding specifics:
private static void initUadpReaderGroup(PubSubUadpReaderGroupConf.Builder<?> builder) {
// Set maximum Network Message size for written Network Messages. If message size exceeds the
// limit, it is chunked into several messages. Note that 65535 is the maximum size allowed for
// UDP connections. NOTE! this setting must match or be larger than the largest Publisher being
// listened.
builder.setMaxNetworkMessageSize(UnsignedInteger.valueOf(4096));
}
private static void initJsonReaderGroup(PubSubJsonReaderGroupConf.Builder<?> builder) {
/*
* JSON messages don't need a limit, though this is mainly since they are only usable with MQTT.
*/
builder.setMaxNetworkMessageSize(UnsignedInteger.MAX_VALUE);
}
4.5.2. DataSet Reader
DataSet Readers are used to read the DataSet messages from Network Messages.
A DataSet Reader that decodes DataSet messages from a specific Publisher and from a specific DataSet Writer can be defined this way:
private void addVariableDataSetReader(PubSubReaderGroupConf.Builder<?> group) {
final PubSubDataSetReaderConf.Builder<?> builder = group.getTransportProfile().createDataSetReaderConfBuilder();
builder.setName(SAMPLE_VARIABLE_DATASET_READER_NAME);
// PublisherId and DataSetWriterId can be used to receive messages from a
// specific Publisher and DataSetWriter only, respectively
builder.setPublisherId(publisherId);
builder.setDataSetWriterId(SAMPLE_VARIABLE_DATASET_WRITER_ID);
// The reader can copy data automatically to target variables in the server's own address
// space if targetVariableId is defined
builder.setSubscribedDataSet(createTargetVariables(targetVariableId));
if (builder instanceof PubSubBrokerTransportSettingsConf.Builder) {
initBrokerTransportSettings((PubSubBrokerTransportSettingsConf.Builder<?>) builder, queueName, metadataQueueName);
}
group.addOrReplaceReader(builder.build());
}
And init parts for transport and message encoding specifics:
private static void initBrokerTransportSettings(PubSubBrokerTransportSettingsConf.Builder<?> builder,
final String queueName, final String metadataQueueName) {
// For MQTT connections we need to set the queueNames and some QoS parameters
/*
* IMPORTANT! For now the queue names are assumed to be non-wildcards.
*/
builder.setQueueName(queueName);
builder.setMetaDataQueueName(metadataQueueName);
builder.setRequestedDeliveryGuarantee(BrokerTransportQualityOfService.AtLeastOnce);
// Note that this is only needed in the Publisher side
builder.setMetaDataUpdateTime(Duration.ofSeconds(30));
}
In addition, if you’re using MQTT protocol, you need to set up the queues where the messages, both for data and meta data, are listened to. This corresponds to the respective transport settings for the DataSet Writer. See initBrokerTransportSettings
for details.
DataSet Metadata
By setting DataSetMetaData information for the DataSet Reader, we can decode the received DataSet message - even if only a subset of the DataSet specified in the Publisher side is included in the message. DataSetMetaData can be retrieved automatically with Discovery mechanisms, as described in Discovery functionality. However, if Discovery is not available but you know the message structure, you can also manually construct a PubSubDataSetMetaDataConf
object and set it for the DataSet Reader with
builder.setDataSetMetaData(dataSetMetaDataConf);
The samples don’t do this, because they rely on the Publisher samples to support metadata discovery.
PubSubSystemEvents
The DataSet Reader is configured to receive data or events in DataSet messages. You can use TargetVariables to map data automatically to Nodes in the server’s own address space or you can just listen to PubSubSystemEvents.
The word event in PubSubSystemEvents refers to the events generated by the PubSubSystem itself. They are not to be confused with PubSub event-dataset(s) data. Thus, you will receive a PubSubSystemEvent, which would hold either variable-dataset or event-dataset data.
|
The sample subscribers configure the listener as follows:
public static void addPubSubSystemListeners(PubSubSystem pubSubSystem) {
...
pubSubSystem.addEventListener(PubSubSystemEvents.PUB_SUB_DATASET_MESSAGE_RECEIVED, event -> {
try {
PubSubDataSetMessage msg = event.getDataSetMessage();
String ds =
dataSetClassIdNullOrZero(msg.getDataSetClassId()) ? "" : ", DataSetClassId=" + msg.getDataSetClassId();
println(String.format("* DataSetMessage: PublisherId=%s, DataSetWriterId=%s%s", event.getPublisherId(),
msg.getDataSetWriterId(), ds));
printDataSetFields(msg.getDataSetFields());
} catch (Exception e) {
println("* DataSetMessage ERROR: " + e);
}
}, executor);
...
}
TargetVariables
The OPC UA server applications that are also Subscribers can also copy variable data automatically to nodes in their own address space.
The mapping between the VariableDataSet and Nodes is done via TargetVariables
.
public static final UaNodeId SAMPLE_TARGET_VARIABLE_ID = UaNodeId.from(SAMPLE_NAMESPACE, "MyPubSubTargetVariable");
private static PubSubTargetVariablesConf createTargetVariables(UaNodeId targetVariableId) {
if (targetVariableId == null) {
return null;
}
final PubSubTargetVariablesConf.Builder builder = PubSubTargetVariablesConf.builder();
// The SamplePubSubPublisherServer is sending two Variables. We want to map the first one
// to a MyPubSubTargetVariable Node. For this, a single TargetVariable configuration is created,
// leaving the second received Variable unhandled.
// Define a target variable for a single dataset field
builder.addTargetVariables(
PubSubFieldTargetConf.builder().setAttributeId(Attributes.Value).setTargetNodeId(targetVariableId).build());
return builder.build();
}
In this code example, only one TargetVariable is configured, meaning that the first variable in a DataSet is assigned to be written to the Node MyPubSubTargetVarible
. When you need to assign all variables of a DataSet with specific Nodes, add more variables with addTargetVariables
.
A similar configuration can be defined for an EventDataSetReader. See the method addEventDataSetReader()
in the SamplePubSubConfiguration
.