Lovegiver (2) [Avatar] Offline
#1
Hi,

I would appreciate some help about unsubscribing a durable subscriber from a topic.

I found many examples on the web but nothing matching with my use case.

Here is my use case :

I've got 3 topics : Topic1, 2 & 3
I've got 3 durable subscribers : User1, 2 & 3

User1 is a durable subscriber for Topics 2 & 3
User2 is a durable subscriber for Topics 1 & 3
User3 is a durable subscriber for Topics 1 & 2

A MessageListener is ON for each subscriber.


To register a subscriber (here : User1), I use :

Receiver receiver1 = new Receiver(user1);


Each user's got its own ClientID (its name).

Then, to subscribe this user to topics (here : topics 2 & 3), I use :

receiver1.receiveMessage(topic2);
receiver1.receiveMessage(topic3);


Each subscription's got its own DurableID.
DurableID is generated like this : ClientID+Topic_Name

Sometimes, a User may want to unsubscribe from one particular topic, but not all of them.

If User1 want to unsubscribe from Topic3, I will do something like this :

session.unsubscribe(DurableID);
with DurableID = User1Topic3


Where I've got a problem is that when I want to unsubscribe, I've got an error message :

"javax.jms.JMSException: Durable consumer is in use"

If I try a "consumer.close()" before, it doesn't work more.


I really don't see how to workaround this problem. User may want to stop a single subscription but not all of the topics he's subscribed.
Moreover, this decision is a user one. I don't see how it could be done without this user to be online.

Here is the code of my Receiver class. It's not clean, but I try everything, adding and deleting lines until it's good.

Thanx for your help.


package com.citizenweb.classes;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.region.Destination;
import com.citizenweb.interfaces.PostIF;
import com.citizenweb.interfaces.UserIF;
import com.citizenweb.classes.Post;

public class Receiver implements MessageListener, Serializable {

	private static final long serialVersionUID = 1L;
	private ActiveMQConnectionFactory factory = null;
	private ActiveMQConnection connection = null;
	private ActiveMQSession session = null;
	private Topic destination = null;
	private MessageConsumer consumer = null;

	UserIF userTopic = new User();
	UserIF userSubscriber = new User();
	List<Message> listeMsg = new ArrayList<Message>();

	public Receiver(UserIF subscriber) {
		this.userSubscriber = subscriber;
	}

	public void connect() {
		try {
			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
			// TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
			factory.setTrustAllPackages(true);
			connection = (ActiveMQConnection) factory.createConnection();
			// ClientID :
			// https://qnalist.com/questions/2068823/create-durable-topic-subscriber
			connection.setClientID(userSubscriber.toString());
			connection.start();
			session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void receiveMessage(UserIF topic) {
		try {
			if (session == null) {
				connect();
			}
			destination = session.createTopic(topic.toString());
			String nomAbonnement = topic.toString() + "-" + userSubscriber.toString();
			consumer = session.createDurableSubscriber(destination, nomAbonnement);
			consumer.setMessageListener(this);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void unsubscribe(UserIF topic) throws JMSException {
		try {
			if (connection == null) {
				connect();
			}
			System.out.println("\n RECEIVER Désinscription du topic " + topic.toString());
			String nomAbonnement = topic.toString() + "-" + userSubscriber.toString();
			System.out.println("\n RECEIVER Abonnement à clore = " + nomAbonnement);
			consumer.close();
			session.unsubscribe(nomAbonnement);
			System.out.println("\n RECEIVER " + userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onMessage(Message message) {
		System.out.println("\n RECEIVER OnMessage triggered for " + userSubscriber.toString());
		listeMsg.add(message);
		System.out.println("\n RECEIVER Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size());
		String classe = message.getClass().getSimpleName();
		System.out.println("\n RECEIVER Classe de message : " + classe);
		try {
			if (message instanceof TextMessage) {
				TextMessage text = (TextMessage) message;
				System.out.println("\n RECEIVER Information : " + text.getText());
			}
			if (message instanceof ObjectMessage) {
				System.out.println("\n RECEIVER ObjectMessage");
				ObjectMessage oMessage = (ObjectMessage) message;
				if (oMessage.getObject() instanceof PostIF) {
					PostIF post = (PostIF) oMessage.getObject();
					String s = ((Post) post).getCorpsMessage();
					System.out.println("\n RECEIVER Post : " + s);
				}
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws JMSException {

		/*
		 * EACH USER IS A TOPIC FOR OTHER USERS
		 * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO SUBSCRIBERS
		*/
		
		//CREATE USER
		UserIF u1, u2, u3;
		String[] nom = new String[5];
		String[] prenom = new String[5];
		String[] login = new String[5];
		String[] password = new String[5];
		Date[] naiss = new Date[5];
		String[] mail = new String[5];
		for (int i = 0; i < 5; i++) {
			nom[i] = "nom_" + i;
			prenom[i] = "prenom_" + i;
			login[i] = "login_" + i;
			password[i] = "password_" + i;
			naiss[i] = new Date();
			mail[i] = "mail_" + i;
		}

		u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
		u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
		u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);
		
		/*
		 * MAKE EACH USER A SUBSCRIBER
		 */
		Receiver receiver1 = new Receiver(u1);
		Receiver receiver2 = new Receiver(u2);
		Receiver receiver3 = new Receiver(u3);
		
		/*
		 * PUT A MESSAGE LISTENER FOR EACH USER
		 */
		receiver1.receiveMessage(u2);
		receiver1.receiveMessage(u3);
		receiver2.receiveMessage(u1);
		receiver2.receiveMessage(u3);
		receiver3.receiveMessage(u1);
		receiver3.receiveMessage(u2);
		
		receiver1.consumer.close();
		receiver2.consumer.close();
		receiver3.consumer.close();
		
		/*
		 * CALL THE SENDER CLASS TO SEND MESSAGES
		 */
		try {
			Sender.main(args);
		} catch (Exception e1) {
			e1.printStackTrace();
		}
		
		receiver1.unsubscribe(u2);
		receiver2.unsubscribe(u3);
		receiver3.unsubscribe(u1);
	}
}
Lovegiver (2) [Avatar] Offline
#2
Unsubscribe a durable subscriber from a topic [Solution]
Solution
==========================


Hello,

I've got the solution and its explanation.

Each connection needs *a unique ClientID* : **connection.setClientID("clientID");**

My mistake was to understand this unicity for a given client.

When a client subscribes to a Topic, there is one connection for this Topic. So, for a given client subscribed to 3 topics (for instance), 3 ClientID are needed because 3 connections are needed.
A ClientID has to be unique because it identifies one connection of one client for one topic.

That's why I had so many JMSExceptions telling that Durable Consumer was In Use when I wanted to end its subscription.

Thanx to all of you who gave me time and support.