> 文章列表 > fastDDS之Subscriber

fastDDS之Subscriber

fastDDS之Subscriber

订阅由定义了DataReader与Subscriber的关联。为了接收发布的消息,应用程序需要再Subscriber创建一个新的DataReader。这个DataReader将被绑定到描述将要接收的数据类型的Topic上,然后就开始开始从与此Topic匹配的Publisher接收数据。
当Subscriber接收到数据时,它通知应用程序有新数据可用。然后,应用程序通过DataReader来获取接收到的数据。
fastDDS之Subscriber
Subscriber充当属于它的一个或多个DataReader对象的代表,作为一个容器,允许在Subscriber的SubscriberQos给出的公共配置下对不同的DataReader对象进行分组。
属于同一Subscriber的DataReader对象之间除了订阅者的SubscriberQos之外没有任何其他关系,否则它们相互独立。具体来说,Subscriber可以托管不同的Topic和数据类型的DataReader对象。

SubscriberQos

SubscriberQos控制Subscriber的行为。在内部它包含以下QosPolicy对象:
fastDDS之Subscriber
使用Subscriber::set_qos()成员函数可以修改先前创建的Subscriber的QoS值。

// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}// Create a Subscriber with default SubscriberQos
Subscriber* subscriber =participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
if (nullptr == subscriber)
{// Errorreturn;
}// Get the current QoS or create a new one from scratch
SubscriberQos qos = subscriber->get_qos();// Modify QoS attributes
qos.entity_factory().autoenable_created_entities = false;// Assign the new Qos to the object
subscriber->set_qos(qos);

默认的SubscriberQos是指DomainParticipant实例上get_default_subscriber_qos()成员函数返回的值。常量对象SUBSCRIBER_QOS_DEFAULT可作为create_subscriber()或Subscriber::set_qos()成员函数的QoS参数,表示使用当前默认的SubscriberQos。
当系统启动时,默认的SubscriberQos与默认构造值SubscriberQos()等效。默认的SubscriberQos可以在任何时候使用DomainParticipant实例上的set_default_subscriber_qos()成员函数进行修改。修改默认的SubscriberQos不会影响已经存在的Subscriber实例。
set_default_subscriber_qos()成员函数也接受特殊值SUBSCRIBER_QOS_DEFAULT作为输入参数。这将把当前默认的SubscriberQos重置为默认构造值SubscriberQos()。

SubscriberListener

SubscriberListener是一个抽象类,定义了将在响应Subscriber的状态更改时触发的回调。默认情况下,所有这些回调函数都是空的,不做任何事情。用户应该实现该类的特化,重写应用程序上需要的回调。未被重写的回调将维持它们空的实现。
SubscriberListener继承自DataReaderListener。因此,它对报告给DataReader的所有事件做出反应。由于事件总是被通知给能够处理事件的最特定的实体监听器,因此只有在触发DataReader没有附加监听器,或者被DataReader上的StatusMask禁用回调时,才会回调SubscriberListener从DataReaderListener继承的回调。此外,SubscriberListener添加了以下回调:
on_data_on_readers():在Subscriber的任何DataReader上有可用新数据。对该回调的调用没有排队,这意味着如果一次接收到几个新的数据更改,则只能为所有这些更改发出一个回调调用,而不是每个更改发出一个。如果应用程序正在此回调中检索接收到的数据,那么它必须一直读取数据,直到没有新的更改。

class CustomSubscriberListener : public SubscriberListener
{public:CustomSubscriberListener(): SubscriberListener(){}virtual ~CustomSubscriberListener(){}virtual void on_data_on_readers(Subscriber* sub){static_cast<void>(sub);std::cout << "New data available" << std::endl;}};

Subscriber创建与删除

Subscriber总是隶属于一个DomainParticipant,可以通过DomainParticipant的create_subscriber()方法创建,通过delete_subscriber()函数删除一个指定的Subscriber

Subscriber* create_subscriber(const SubscriberQos& qos,SubscriberListener* listener = nullptr,const StatusMask& mask = StatusMask::all())ReturnCode_t delete_subscriber(const Subscriber* subscriber)
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}// Create a Subscriber with default SubscriberQos and no Listener
// The value SUBSCRIBER_QOS_DEFAULT is used to denote the default QoS.
Subscriber* subscriber_with_default_qos =participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
if (nullptr == subscriber_with_default_qos)
{// Errorreturn;
}// A custom SubscriberQos can be provided to the creation method
SubscriberQos custom_qos;// Modify QoS attributes
// (...)Subscriber* subscriber_with_custom_qos =participant->create_subscriber(custom_qos);
if (nullptr == subscriber_with_custom_qos)
{// Errorreturn;
}// Create a Subscriber with default QoS and a custom Listener.
// CustomSubscriberListener inherits from SubscriberListener.
// The value SUBSCRIBER_QOS_DEFAULT is used to denote the default QoS.
CustomSubscriberListener custom_listener;
Subscriber* subscriber_with_default_qos_and_custom_listener =participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT, &custom_listener);
if (nullptr == subscriber_with_default_qos_and_custom_listener)
{// Errorreturn;
}// Delete the entities the subscriber created
if (subscriber->delete_contained_entities() != ReturnCode_t::RETCODE_OK)
{// Subscriber failed to delete the entities it createdreturn;
}// Delete the Subscriber
if (participant->delete_subscriber(subscriber) != ReturnCode_t::RETCODE_OK)
{// Errorreturn;
}

DataReader

DataReader必须附属在一个确定的Subscriber,一旦创建就要被绑定到一个Topic上。该Topic必须在创建DataReader之前存在,并且必须绑定到DataReader想要发布的数据类型。
在Subscriber中为特定Topic创建新DataReader的就是为了使用Topic描述的名称和数据类型发起新订阅。创建之后,当从远程发布接收到数据值更改时,将会通知到应用程序。此外可以通过DataReader的DataReader::read_next_sample()或DataReader::take_next_sample()成员函数来检索这些更改。
可以通过DataReaderQos控制DataReader的行为,其内部它包含以下QosPolicy对象:
fastDDS之Subscriber
通过DataReader的DataReader::set_qos()成员函数修改可以修改QoS值。默认的DataReaderQos是由Subscriber实例上的get_default_datareader_qos()成员函数返回的值。常量对象DATAREADER_QOS_DEFAULT可以作为create_datareader()或DataReader::set_qos()成员函数的QoS参数,表示应该使用当前默认的DataReaderQos。当系统启动时,默认的DataReaderQos等同于默认构造的DataReaderQos()。默认的DataReaderQos可以在任何时候使用Subscriber实例上的set_default_datareader_qos()成员函数修改。修改默认DataReaderQos不会影响已经存在的DataReader实例。
set_default_datareader_qos()成员函数也接受特殊值DATAREADER_QOS_DEFAULT作为输入参数。这将重置当前默认DataReaderQos为默认构造值DataReaderQos()。

// Get the current QoS or create a new one from scratch
DataReaderQos qos_type1 = subscriber->get_default_datareader_qos();// Modify QoS attributes
// (...)// Set as the new default DataReaderQos
if (subscriber->set_default_datareader_qos(qos_type1) != ReturnCode_t::RETCODE_OK)
{// Errorreturn;
}// Create a DataReader with the new default DataReaderQos.
DataReader* data_reader_with_qos_type1 =subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader_with_qos_type1)
{// Errorreturn;
}// Get the current QoS or create a new one from scratch
DataReaderQos qos_type2;// Modify QoS attributes
// (...)// Set as the new default DataReaderQos
if (subscriber->set_default_datareader_qos(qos_type2) != ReturnCode_t::RETCODE_OK)
{// Errorreturn;
}// Create a DataReader with the new default DataReaderQos.
DataReader* data_reader_with_qos_type2 =subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader_with_qos_type2)
{// Errorreturn;
}// Resetting the default DataReaderQos to the original default constructed values
if (subscriber->set_default_datareader_qos(DATAREADER_QOS_DEFAULT)!= ReturnCode_t::RETCODE_OK)
{// Errorreturn;
}// The previous instruction is equivalent to the following
if (subscriber->set_default_datareader_qos(DataReaderQos())!= ReturnCode_t::RETCODE_OK)
{// Errorreturn;
}

DataReaderListener

DataReaderListener是一个抽象类,定义了当DataReader上的状态更改时将触发的回调。默认情况下,所有的回调函数都是空的,不做任何事情。用户应该实现该类的特化,重写应用程序上需要的回调。未重写的回调将维持它们空的实现。
DataReaderListener定义了以下回调:
1.on_data_available(): DataReader上有应用程序可用的新数据。对该回调的调用没有排队,这意味着如果一次接收到几个新的数据更改,则只能为所有这些更改发出一个回调调用,而不是每个更改发出一个。如果应用程序正在此回调中检索接收到的数据,那么它必须一直读取数据,直到没有新的更改。
2.on_subscription_matched(): DataReader已经找到了与主题匹配,并且具有公共分区和兼容的QoS的DataWriter,或者重新与已停止匹配的DataWriter匹配。又或者当匹配的DataWriter改变了它的DataWriterQos时,也会触发它。
3.on_requested_deadline_missed(): DataReader在其DataReaderQos上配置的截止时间内没有收到数据。对于DataReader错过数据的每个截止日期和数据实例,都将调用它。
4.on_requested_incompatible_qos(): DataReader已经找到了一个与主题匹配,且处于同一公共分区的datwriter,但是其QoS与DataReader上定义的QoS不兼容。
5.on_liveliness_changed():匹配的datwriter的活动状态已经改变。要么是非活动的DataWriter已变为活动,要么相反。
6.on_sample_rejected():接收的数据样本被拒绝。更多信息请参见SampleRejectedStatus。
7.on_sample_lost():数据样本丢失,永远不会接收到。有关更多信息,请参阅SampleLostStatus。

DataReader创建与删除

DataReader始终需要依附在一个 Subscriber上,可以通过Subscriber实例上的create_datareader()成员函数完成创建,delete_datareader()成员函数完成删除。

DataReader* create_datareader(TopicDescription* topic,const DataReaderQos& reader_qos,DataReaderListener* listener = nullptr,const StatusMask& mask = StatusMask::all()ReturnCode_t delete_datareader(const DataReader* reader);

以下是使用例程:

// Create a DataReader with default DataReaderQos and no Listener
// The value DATAREADER_QOS_DEFAULT is used to denote the default QoS.
DataReader* data_reader_with_default_qos =subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader_with_default_qos)
{// Errorreturn;
}// A custom DataReaderQos can be provided to the creation method
DataReaderQos custom_qos;// Modify QoS attributes
// (...)DataReader* data_reader_with_custom_qos =subscriber->create_datareader(topic, custom_qos);
if (nullptr == data_reader_with_custom_qos)
{// Errorreturn;
}// Create a DataReader with default QoS and a custom Listener.
// CustomDataReaderListener inherits from DataReaderListener.
// The value DATAREADER_QOS_DEFAULT is used to denote the default QoS.
CustomDataReaderListener custom_listener;
DataReader* data_reader_with_default_qos_and_custom_listener =subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT, &custom_listener);
if (nullptr == data_reader_with_default_qos_and_custom_listener)
{// Errorreturn;
}// Delete the entities the DataReader created
if (data_reader->delete_contained_entities() != ReturnCode_t::RETCODE_OK)
{// DataReader failed to delete the entities it created.return;
}// Delete the DataReader
if (subscriber->delete_datareader(data_reader) != ReturnCode_t::RETCODE_OK)
{// Errorreturn;
}

SampleInfo

DataReader检索sample时,除了sample数据外,还返回SampleInfo实例。该对象包含补充返回数据值的附加信息,有助于对其进行解释。例如,如果valid_data值为false,则DataReader不会通知应用程序数据实例中的新值,而是通知其状态的更改,必须丢弃返回的数据值。
接下来,将描述SampleInfo的数据成员,以及与返回的sample数据相关的每个数据成员的含义。
1.sample_state:表示之前是否已经读取了相应的数据样例。它可以取以下值之一:
(1)READ:这是该数据样本第一次被检索。
(2)NOT_READ:数据示例之前已经读取或获取过。
2.view_state:表示这是否是DataReader检索的该数据实例的第一个示例。它可以取以下值之一:
(1)NEW:这是第一次检索该实例的示例。
(2)NOT_NEW:此实例的其他示例以前已经检索过。
3.instance_state:表示实例是当前存在还是已被释放。如已释放,它还提供关于清除的原因。它可以取以下值之一:
(1)ALIVE:实例当前存在。
(2)NOT_ALIVE_DISPOSED:远程DataWriter释放实例。
(3)NOT_ALIVE_NO_WRITERS: DataReader丢弃了实例,因为发布实例的远程DataWriter没有活的。
4.disposed_generation_count:表示实例在被释放后活跃起来的次数。
5.no_writers_generation_count:表示实例由于NOT_ALIVE_NO_WRITERS被释放为后活跃起来的次数。
6.sample_rank:表示在此之后收到的同一实例的sample数量。例如,值为5意味着DataReader上有5个更新的sample可用。
7.generation_rank:表示从接收sample到接收仍然保存在集合中的同一实例的最新sample之间,实例被释放并重新激活的次数
8.absolute_generation_rank:表示从接收sample到接收同一实例(可能不在集合中)的最新sample之间,实例被释放并重新激活的次数。
9.source_timestamp:保存了DataWriter在发布sample时提供的时间戳。
10.instance_handle:本地实例句柄。
11.publication_handle:发布数据的DataWriter的句柄。
12.valid_data:是一个表示数据sample值是否发生变化的bool值。如果只是实例状态发生变化,该标志设为false,这种情况下,就应该忽略该数据sample。
13.sample_identity:是请求者-应答者配置的扩展。它包含DataWriter和当前消息的序列号,应答者在发送应答时使用它来填充related_sample_identity。
14.related_sample_identity:是请求-应答器配置的扩展。在应答消息中,它包含相关请求消息的sample_identity。请求者使用它将每个应答链接到适当的请求。

获取接收的数据

应用程序可以通过读取或接收来访问和使用DataReader上接收到的数据值。
1.读取可以通过以下方法:
(1)DataReader::read_next_sample()读取DataReader上可用的下一个先前未访问的数据值,并将其存储在提供的数据缓冲区中。
(2)DataReader::read()、DataReader::read_instance()和DataReader::read_next_instance()提供了获取匹配特定条件的样本集合的机制。
2.接收可以通过以下方法:
(1)DataReader::take_next_sample()读取DataReader上可用的下一个先前未访问的数据值,并将其存储在提供的数据缓冲区中。
(2)DataReader::take()、DataReader::take_instance()和DataReader::take_next_instance()提供了获取匹配特定条件的样本集合的机制。
当DataReader中没有符合条件的数据时,所有操作将返回NO_DATA,输出参数保持不变。此外,数据访问还提供了有助于解释返回数据值的SampleInfo实例,如原始DataWriter或发布时间戳。

DataReader::read()和DataReader::take()操作(及其变体)以两个队列形式返回信息,一个是接收到的DDS数据样本的数据类型队列,一个是对应每个DDS样本信息的SampleInfo队列。
这些队列是通过DataReader::read()和DataReader::take()操作的参数传入。当传入的队列为空时(已初始化,但最大长度为0),中间件将用直接从接收队列本身借来的内存填充这些序列。借出数据或SampleInfo是没有拷贝的,是索数据最有效的方法。
但是,在这样做时,代码必须将借出的队列用完后返还给中间件,以便接收队列可以重用它们。如果应用程序没有调用DataReader::return_loan()操作返还,那么Fast DDS最终将耗尽内存,DataReader无法存储从网络接收的DDS数据样本。
从返还的队列中获取数据非常容易,队列提供了api length()用于查看元素的数量。应用程序代码只需要检查这个值并使用[]操作符来访问相应的元素。只有当SampleInfo队列上的相应元素存在有效数据时,才应该访问DDS数据队列上的元素。例程如下:

    // Sequences are automatically initialized to be empty (maximum == 0)FooSeq data_seq;SampleInfoSeq info_seq;// with empty sequences, a take() or read() will return loaned// sequence elementsReturnCode_t ret_code = data_reader->take(data_seq, info_seq,LENGTH_UNLIMITED, ANY_SAMPLE_STATE,ANY_VIEW_STATE, ANY_INSTANCE_STATE);// process the returned dataif (ret_code == ReturnCode_t::RETCODE_OK){// Both info_seq.length() and data_seq.length() will have the number of samples returnedfor (FooSeq::size_type n = 0; n < info_seq.length(); ++n){// Only samples for which valid_data is true should be accessedif (info_seq[n].valid_data){// Do something with data_seq[n]}}// must return the loaned sequences when done processingdata_reader->return_loan(data_seq, info_seq);}

数据有获取有如下一种方式:
1.回调
当DataReader从任何匹配的DataWriter接收到新的数据值时,它会通过两个Listener回调通知应用程序:
(1)on_data_available()。
(2)on_data_on_readers()。
这些回调可用于检索新到达的数据,如下面的示例所示:

class CustomizedDataReaderListener : public DataReaderListener
{public:CustomizedDataReaderListener(): DataReaderListener(){}virtual ~CustomizedDataReaderListener(){}void on_data_available(DataReader* reader) override{// Create a data and SampleInfo instanceFoo data;SampleInfo info;// Keep taking data until there is nothing to takewhile (reader->take_next_sample(&data, &info) == ReturnCode_t::RETCODE_OK){if (info.valid_data){// Do something with the datastd::cout << "Received new data value for topic "<< reader->get_topicdescription()->get_name()<< std::endl;}else{std::cout << "Remote writer for topic "<< reader->get_topicdescription()->get_name()<< " is dead" << std::endl;}}}};

2.通过一个等待线程
应用程序可以通过一个专门的线程来等待DataReader上的新数据,使用一个wait-set来等待DataAvailable状态的变化。

// Create a DataReader
DataReader* data_reader =subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader)
{// Errorreturn;
}// Prepare a wait-set to wait for data on the DataReader
WaitSet wait_set;
StatusCondition& condition = data_reader->get_statuscondition();
condition.set_enabled_statuses(StatusMask::data_available());
wait_set.attach_condition(condition);// Create a data and SampleInfo instance
Foo data;
SampleInfo info;//Define a timeout of 5 seconds
eprosima::fastrtps::Duration_t timeout (5, 0);// Loop reading data as it arrives
// This will make the current thread to be dedicated exclusively to
// waiting and reading data until the remote DataWriter dies
while (true)
{ConditionSeq active_conditions;if (ReturnCode_t::RETCODE_OK == wait_set.wait(active_conditions, timeout)){while (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample(&data, &info)){if (info.valid_data){// Do something with the datastd::cout << "Received new data value for topic "<< topic->get_name()<< std::endl;}else{// If the remote writer is not alive, we exit the reading loopstd::cout << "Remote writer for topic "<< topic->get_name()<< " is dead" << std::endl;break;}}}else{std::cout << "No data this time" << std::endl;}
}

使用DataReader::wait_for_unread_message()成员函数也可以达到同样的效果,该成员函数会产生阻塞,直到新的数据sample可用或给定时间超时。如果超时后没有新的数据可用,它将返回值为false。这个函数返回值为true,意味着DataReader上有新数据可供应用程序检索。

// Create a DataReader
DataReader* data_reader =subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader)
{// Errorreturn;
}// Create a data and SampleInfo instance
Foo data;
SampleInfo info;//Define a timeout of 5 seconds
eprosima::fastrtps::Duration_t timeout (5, 0);// Loop reading data as it arrives
// This will make the current thread to be dedicated exclusively to
// waiting and reading data until the remote DataWriter dies
while (true)
{if (data_reader->wait_for_unread_message(timeout)){if (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample(&data, &info)){if (info.valid_data){// Do something with the datastd::cout << "Received new data value for topic "<< topic->get_name()<< std::endl;}else{// If the remote writer is not alive, we exit the reading loopstd::cout << "Remote writer for topic "<< topic->get_name()<< " is dead" << std::endl;break;}}}else{std::cout << "No data this time" << std::endl;}
}