User Tools

Site Tools


message_queue_mq_-_publish_subscribe_message

This Page is just for Programming Guide for Message Queue(MQ). you can find detailed scenario and introduction from https://wiki.iotivity.org/message_queue_mq_for_publish-subscribe_interactions

API Design

We provide below APIs and All 'Topics' is expressed as resource concept which we nomally know. so you can use MQ API like other API of OCResource class of C++ SDK.

// SUB - subscribe topic
OCStackResult subscribeMQTopic(JNIEnv* env, const QueryParamsMap &queryParametersMap,
                               jobject jListener, QualityOfService QoS);
                               
OCStackResult unsubscribeMQTopic(QualityOfService QoS);

OCStackResult requestMQPublish(JNIEnv* env, const QueryParamsMap &queryParametersMap,
                               jobject jListener, QualityOfService QoS);
                               
// PUB publish topic
OCStackResult publishMQTopic(JNIEnv* env, const OCRepresentation &representation,
                             const QueryParamsMap &queryParametersMap,
                             jobject jListener, QualityOfService QoS);
                             
// PUB, SUB, Broker can use these in common
OCStackResult discoveryMQTopics(JNIEnv* env, const QueryParamsMap &queryParametersMap,
                                jobject jListener, QualityOfService QoS);
                                
OCStackResult createMQTopic(JNIEnv* env, const OCRepresentation &representation,
                            const std::string &targetUri,
                            const QueryParamsMap &queryParametersMap,
                            jobject jListener, QualityOfService QoS);

C++ API Usages

1. Discovery Topics (subsriber -> broker, publisher -> broker)

Since target topic is needed for subscribing or publishing on a MQ broker. we should get topic list from the MQ broker first. also you can apply topic type like below sample

string topicType;
cin >> topicType;
 
QueryParamsMap query;
query["rt"] = string(topicType);
 
if (psResource)  // MQ Broker
{
    OCStackResult result = psResource->discoveryMQTopics(query, onFoundMQTopic, QualityOfService::LowQos);
}
else
{
    cout << "MQ broker is not valid" << endl;
}

And then you can receive topics in below callback format. this callback is similar with the callback of the OCFindResource API.

void onFoundMQTopic(const int eCode, const string uri, shared_ptr<OC::OCResource> topic)
{
    cout << "topic uri : " << topic->uri() << endl;
	targetTopic = topic;
    ...
}

2. Subscribe Topic (subsriber -> broker)

If you want to subscribe some topic which discovery from MQ broker, you can use subscribeMQTopic API . this API is exposed when you apply 'SUB' build option (.e.g WITH_MQ=SUB)

QueryParamsMap query;
if (targetTopic)
{
	targetTopic->subscribeMQTopic(ObserveType::Observe, query, &onSubscribedCB, QualityOfService::LowQos);
}
else
{
	cout << "Topic is not valid" << endl;
}

You can receive information of subscribe topic from MQ Broker. when the topic is published from the Publisher. Since you subscribed the topic and get published information of the topic from MQ broker. it is called Subscriber.

void onSubscribedCB(const HeaderOptions /*headerOptions*/, const OCRepresentation& rep,
                                const int& eCode, const int& sequenceNumber)
{
    cout << "(onSubscribedCB) response code: " << eCode << endl;
	cout << "(onSubscribedCB) sequenceNumber: " << sequenceNumber << endl;
	cout << "(onSubscribedCB) blue light state - " << rep.getValueToString(string("blue")) << endl;
	cout << "(onSubscribedCB) red light state - " << rep.getValueToString(string("red")) << endl;
}

you can unsubscribe topic if you want.

targetTopic->unsubscribeMQTopic(QualityOfService::LowQos);

3. Create Topic (subsriber -> broker, publisher -> broker)

Since there are no specific rule that who can create topic between publisher or subcriber. we provide both can create topic the API called createMQTopic(..) but other side should discover the topic by discovery topics (step 1)

string topicType;
cin >> topicType;
 
OCRepresentation rep;
QueryParamsMap query;
query["rt"] = string(topicType);
 
if (psResource)  // MQ broker
{
	OCStackResult result = psResource->createMQTopic(rep, requestQuery, query, onCreatedTopicCB, QualityOfService::LowQos);
	if (result != OC_STACK_OK)
	{
		return;
	}
}
else
{
	cout << "psResource is not valid" << endl;
}

Publisher will be receive the information of created topic through callback

void onCreatedTopicCB(const int ecode, string uri,
                                    shared_ptr<OC::OCResource> topic)
{
    cout << "response code: " << ecode << endl;
    if (ecode == OC_STACK_RESOURCE_CREATED)
    {
        cout << "OC_STACK_RESOURCE_CREATED received" << endl;
        if (topic)
        {
            // Get the topic URI
            cout << "URI of the topic: " << uri.c_str() << endl;
            pubTopic = topic;
        }
    }
}

4. Publish Topic (publisher -> broker)

publisher can inform broker of their change. we provide publish api called publishMQTopic(..) this API is exposed when you apply 'PUB' build option (.e.g WITH_MQ=PUB)

OCRepresentation rep;
rep.setValue(string("blue"), string("on"));
rep.setValue(string("red"), string("off"));
QueryParamsMap query;
 
if (pubTopic)
{
	OCStackResult result = pubTopic->publishMQTopic(rep, query, onPublishedCB, QualityOfService::LowQos);
	if (result != OC_STACK_OK)
	{
		return;
	}
}
else
{
	cout << "pubTopic is not valid" << endl;
}

5. Request to start Publish (subsriber -> publisher)

If there is no subcriber to want to get publish information for some topic. For having a transmission about publishing, it must be a waste of time and network resource.

If Publisher know someone who want to get topic information it has. And it can start publish to broker. we can save time and network resource.

Please refer detailed sequence diagram in https://wiki.iotivity.org/message_queue_mq_for_publish-subscribe_interactions We provide the API called requestMQPublish(..) for this function.

OCRepresentation rep;
rep.setValue(string("req_pub"), string("true"));
QueryParamsMap query;
 
if (targetTopic)
{
	OCStackResult result = targetTopic->requestMQPublish(query, onRequestMQPublishCB, QualityOfService::LowQos);
	if (result != OC_STACK_OK)
	{
		return;
	}
}
else
{
	cout << "targetTopic is not valid" << endl;
}
message_queue_mq_-_publish_subscribe_message.txt · Last modified: 2016/11/04 09:34 by jihwan seo