The Author Online Book Forums are Moving

The Author Online Book Forums will soon redirect to Manning's liveBook and liveVideo. All book forum content will migrate to liveBook's discussion forum and all video forum content will migrate to liveVideo. Log in to liveBook or liveVideo with your Manning credentials to join the discussion!

Thank you for your engagement in the AoF over the years! We look forward to offering you a more enhanced forum experience.

Lovegiver (2) [Avatar] Offline
#1
Hi,

I would appreciate some help about unsubscribing a durable subscriber from a topic.

I found many examples on the web but nothing matching with my use case.

Here is my use case :

I've got 3 topics : Topic1, 2 & 3
I've got 3 durable subscribers : User1, 2 & 3

User1 is a durable subscriber for Topics 2 & 3
User2 is a durable subscriber for Topics 1 & 3
User3 is a durable subscriber for Topics 1 & 2

A MessageListener is ON for each subscriber.


To register a subscriber (here : User1), I use :

Receiver receiver1 = new Receiver(user1);


Each user's got its own ClientID (its name).

Then, to subscribe this user to topics (here : topics 2 & 3), I use :

receiver1.receiveMessage(topic2);
receiver1.receiveMessage(topic3);


Each subscription's got its own DurableID.
DurableID is generated like this : ClientID+Topic_Name

Sometimes, a User may want to unsubscribe from one particular topic, but not all of them.

If User1 want to unsubscribe from Topic3, I will do something like this :

session.unsubscribe(DurableID);
with DurableID = User1Topic3


Where I've got a problem is that when I want to unsubscribe, I've got an error message :

"javax.jms.JMSException: Durable consumer is in use"

If I try a "consumer.close()" before, it doesn't work more.


I really don't see how to workaround this problem. User may want to stop a single subscription but not all of the topics he's subscribed.
Moreover, this decision is a user one. I don't see how it could be done without this user to be online.

Here is the code of my Receiver class. It's not clean, but I try everything, adding and deleting lines until it's good.

Thanx for your help.


package com.citizenweb.classes;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.region.Destination;
import com.citizenweb.interfaces.PostIF;
import com.citizenweb.interfaces.UserIF;
import com.citizenweb.classes.Post;

public class Receiver implements MessageListener, Serializable {

	private static final long serialVersionUID = 1L;
	private ActiveMQConnectionFactory factory = null;
	private ActiveMQConnection connection = null;
	private ActiveMQSession session = null;
	private Topic destination = null;
	private MessageConsumer consumer = null;

	UserIF userTopic = new User();
	UserIF userSubscriber = new User();
	List<Message> listeMsg = new ArrayList<Message>();

	public Receiver(UserIF subscriber) {
		this.userSubscriber = subscriber;
	}

	public void connect() {
		try {
			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
			// TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
			factory.setTrustAllPackages(true);
			connection = (ActiveMQConnection) factory.createConnection();
			// ClientID :
			// https://qnalist.com/questions/2068823/create-durable-topic-subscriber
			connection.setClientID(userSubscriber.toString());
			connection.start();
			session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void receiveMessage(UserIF topic) {
		try {
			if (session == null) {
				connect();
			}
			destination = session.createTopic(topic.toString());
			String nomAbonnement = topic.toString() + "-" + userSubscriber.toString();
			consumer = session.createDurableSubscriber(destination, nomAbonnement);
			consumer.setMessageListener(this);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void unsubscribe(UserIF topic) throws JMSException {
		try {
			if (connection == null) {
				connect();
			}
			System.out.println("\n RECEIVER Désinscription du topic " + topic.toString());
			String nomAbonnement = topic.toString() + "-" + userSubscriber.toString();
			System.out.println("\n RECEIVER Abonnement à clore = " + nomAbonnement);
			consumer.close();
			session.unsubscribe(nomAbonnement);
			System.out.println("\n RECEIVER " + userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onMessage(Message message) {
		System.out.println("\n RECEIVER OnMessage triggered for " + userSubscriber.toString());
		listeMsg.add(message);
		System.out.println("\n RECEIVER Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size());
		String classe = message.getClass().getSimpleName();
		System.out.println("\n RECEIVER Classe de message : " + classe);
		try {
			if (message instanceof TextMessage) {
				TextMessage text = (TextMessage) message;
				System.out.println("\n RECEIVER Information : " + text.getText());
			}
			if (message instanceof ObjectMessage) {
				System.out.println("\n RECEIVER ObjectMessage");
				ObjectMessage oMessage = (ObjectMessage) message;
				if (oMessage.getObject() instanceof PostIF) {
					PostIF post = (PostIF) oMessage.getObject();
					String s = ((Post) post).getCorpsMessage();
					System.out.println("\n RECEIVER Post : " + s);
				}
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws JMSException {

		/*
		 * EACH USER IS A TOPIC FOR OTHER USERS
		 * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO SUBSCRIBERS
		*/
		
		//CREATE USER
		UserIF u1, u2, u3;
		String[] nom = new String[5];
		String[] prenom = new String[5];
		String[] login = new String[5];
		String[] password = new String[5];
		Date[] naiss = new Date[5];
		String[] mail = new String[5];
		for (int i = 0; i < 5; i++) {
			nom[i] = "nom_" + i;
			prenom[i] = "prenom_" + i;
			login[i] = "login_" + i;
			password[i] = "password_" + i;
			naiss[i] = new Date();
			mail[i] = "mail_" + i;
		}

		u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
		u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
		u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);
		
		/*
		 * MAKE EACH USER A SUBSCRIBER
		 */
		Receiver receiver1 = new Receiver(u1);
		Receiver receiver2 = new Receiver(u2);
		Receiver receiver3 = new Receiver(u3);
		
		/*
		 * PUT A MESSAGE LISTENER FOR EACH USER
		 */
		receiver1.receiveMessage(u2);
		receiver1.receiveMessage(u3);
		receiver2.receiveMessage(u1);
		receiver2.receiveMessage(u3);
		receiver3.receiveMessage(u1);
		receiver3.receiveMessage(u2);
		
		receiver1.consumer.close();
		receiver2.consumer.close();
		receiver3.consumer.close();
		
		/*
		 * CALL THE SENDER CLASS TO SEND MESSAGES
		 */
		try {
			Sender.main(args);
		} catch (Exception e1) {
			e1.printStackTrace();
		}
		
		receiver1.unsubscribe(u2);
		receiver2.unsubscribe(u3);
		receiver3.unsubscribe(u1);
	}
}
Lovegiver (2) [Avatar] Offline
#2
Unsubscribe a durable subscriber from a topic [Solution]
Solution
==========================


Hello,

I've got the solution and its explanation.

Each connection needs *a unique ClientID* : **connection.setClientID("clientID");**

My mistake was to understand this unicity for a given client.

When a client subscribes to a Topic, there is one connection for this Topic. So, for a given client subscribed to 3 topics (for instance), 3 ClientID are needed because 3 connections are needed.
A ClientID has to be unique because it identifies one connection of one client for one topic.

That's why I had so many JMSExceptions telling that Durable Consumer was In Use when I wanted to end its subscription.

Thanx to all of you who gave me time and support.