This is the class I defined, he has a static field PubMsgs.

class storage{
    private static Map<String, List<String>> subLists = new HashMap<>();
    private static Map<String, List<Request>> pubMsgs = new HashMap<>();
    private static Lock lock = new ReentrantLock();

    public static void printPubMsgs(){
        lock.lock();
        Set<String> keyss = subLists.keySet();
        System.out.println("keyss:" + keyss);
        Set<String> keys = pubMsgs.keySet();
        System.out.println("keys:" + keys);
        for (String key :
                keys) {
            List<Request> list = pubMsgs.get(key);
            System.out.println("list1:" + list.toString());
            Iterator<Request> iterator = list.listIterator();
            while (iterator.hasNext()) {
                System.out.println(iterator);
                System.out.println("here");
            }
        }
        lock.unlock();
    }

}

I call its printPubMsgs method in other places, but he always returns empty, and I'm sure I have added elements before.

I tried to debug and found that the size of the map in the printPubMsgs function is 0. debug I confirmed that he should not be 0, because another thread sends a request to storage, he still has a return. enter image description here


Let me explain a little more clearly. First, the publisher publishes some news. These messages are accepted by the instance of topic class, and the instance calls the method addMessage of the storage class to store these messages in the storage class.
published
accepted

public static void addMessage(Request request) {
    lock.lock();
    if (pubMsgs.containsKey(request.getSource())) {
        pubMsgs.get(request.getSource()).add(request);
    } else {
        List<Request> list = new ArrayList<>();
        list.add(request);
        pubMsgs.put(request.getSource(), list);
    }
    lock.unlock();
}

This project is to simulate a message queue, I use multithreading to simulate the concurrency scenario of multiple subscribers and publishers.


I post the key code of my project below.
Publisher.java

@Override
public void run() {
    try (Socket socket = new Socket(address, port);
        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());) {

        oos.writeObject(new Request(pubHead, msg, id));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Subscriber.java

@Override
    public void run() {
        try (Socket s = new Socket(address, port);
            ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());) {
            oos.writeObject(new Request(subHead, sub_id, id));
            oos.close();
        } catch (Exception e) {

            e.printStackTrace();
        }

        try (Socket socket = new Socket(address, port);
            ObjectOutputStream oos1 = new ObjectOutputStream(socket.getOutputStream());) {
            oos1.writeObject(new Request(getHead, "", id));
//            sleep(1000);
            while (true) {
                ObjectInputStream ooi = new ObjectInputStream(socket.getInputStream());
                Request req = (Request) ooi.readObject();
                if (Objects.equals(req.getHead(), wrongHead)) {
                    System.out.println(req.getBody());
                    break;
                } else if (Objects.equals(req.getHead(), completeHead)) {
                    System.out.println(req.getBody());
                    break;
                }
                else{
                    System.out.println(req.getBody());
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Topic.java

@Override
public void run() {
    storage.update();
    try(ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());){
        Request req = (Request) ois.readObject();
        if (req.getHead().equals(pubHead)) {
            System.out.println(">>> Successfully receive publish request");
            storage.addMessage(req);
        } else if (req.getHead().equals(getHead)) {
            System.out.println(">>> Successfully receive get request");
            List<Request> res = storage.getMessage(req.getSource());
            if (res == null) {
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                oos.writeObject(new Request(wrongHead, "you should subscribe first", "topic"));
            } else if (res.isEmpty()) {
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                oos.writeObject(new Request(wrongHead, "there are no messages from whom you subscribe", "topic"));
            } else {
                for (Request re : res) {
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    oos.writeObject(re);
                }
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                oos.writeObject(new Request(completeHead, ">>> all messages have been gotten", "topic"));
            }
        } else if (req.getHead().equals(subHead)) {
            storage.accessSub(req.getSource(), req.getBody());
            System.out.println(">>> Successfully receive subscribe request");
        }
    }catch (Exception e){
        e.printStackTrace();
    }
}

Storage.java

class storage{
    private static Map<String, List<String>> subLists = new HashMap<>();
    private static Map<String, List<Request>> pubMsgs = new HashMap<>();
    private static Lock lock = new ReentrantLock();

    public static void printPubMsgs(){
        lock.lock();
        Set<String> keyss = subLists.keySet();
        System.out.println("keyss:" + keyss);
        Set<String> keys = pubMsgs.keySet();
        System.out.println("keys:" + keys);
        for (String key :
                keys) {
            List<Request> list = pubMsgs.get(key);
            System.out.println("list1:" + list.toString());
            Iterator<Request> iterator = list.listIterator();
            while (iterator.hasNext()) {
                System.out.println(iterator);
                System.out.println("here");
            }
        }
        lock.unlock();
    }

    public static void addMessage(Request request) {
        lock.lock();
        if (pubMsgs.containsKey(request.getSource())) {
            pubMsgs.get(request.getSource()).add(request);
        } else {
            List<Request> list = new ArrayList<>();
            list.add(request);
            pubMsgs.put(request.getSource(), list);
        }
        lock.unlock();
    }

    public static List<Request> getMessage(String source) {
        lock.lock();
        List<Request> result = new ArrayList<>();
        if (subLists.containsKey(source)) {
            List<String> list = subLists.get(source);
            for (String s : list) {
                if (!pubMsgs.containsKey(s)) continue;
                for (Request req : pubMsgs.get(s)) {
                    result.add(new Request(req.getHead(), req.getBody(), req.getSource()));
                }
            }
        } else {
            lock.unlock();
            return null; // wrong request
        }
        lock.unlock();
        return result; // if empty --> wrong request
    }

    public static void accessSub(String source, String destination) {
        lock.lock();
        if (subLists.containsKey(source)) {
            subLists.get(source).add(destination);
        } else {
            List<String> list = new ArrayList<>();
            list.add(destination);
            subLists.put(source, list);
        }
        lock.unlock();
    }

    /**
     * if the existence exceed 1000s, then remove
     */
    public static void update() {
        lock.lock();
        Set<String> keys = pubMsgs.keySet();
        for (String key : keys) {
            List<Request> list = pubMsgs.get(key);
            Iterator<Request> iterator = list.listIterator();
            while (iterator.hasNext()) {
                Date now = new Date();
                Date date = iterator.next().getDate();
                if ((now.getTime() - date.getTime()) / 1000 > 1000) {
                    iterator.remove();
                    System.out.println("test");
                }
            }
        }
        lock.unlock();
    }
}

Any help on this is appreciated.

0

There are 0 best solutions below