ChronicleQueue how to remove object from queue or how to update object

260 Views Asked by At

Hi i want simple persistent queue for single process. It will record (add to queue) if any sql fails then when sql server awake , it will send all remaining objects from queue back to sql server. If any record from queue will fail again while sending , it wont remove the object from the queue.

I have tried simple application test , that will record some object to queue and read from queue.

For the Test Application i had problems like , I needed to put constant 3 ( TestQueue#pushTest 3 objects pushed to queue ) , but in real life i need all queue size.

...
for (int i = 0; i < 3; i++) {
...

Second problem is on TestQueue#tryToSend . When i first run my test TestQueue#tryToSend It dumps all objects 3 correctly But on second run same test TestQueue#tryToSend i still got 3 dump of object.I supposed to get no result because i thought that when i call tailer.readingDocument() method will delete the results. I want to delete the objects from the queue when the object completes the sql operation successfully or wont delete the object.

But currently i cant delete the object i dont know which method has this capability or i dont know how to flag object process completed or not

Currently when i see no solution to my problem from chronicle-queue i have migrated to https://github.com/square/tape Its not updated long time.If you find solution to my problem i will migrate back to chronicle queue.

My codes as shown below:

With dependencies:

        <dependency>
            <groupId>net.openhft</groupId>
            <artifactId>chronicle-queue</artifactId>
            <version>5.22.18</version>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>

import model.LogData;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.*;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;

import java.sql.Timestamp;

public class ExampleMain {

    private static final SingleChronicleQueue singleChronicleQueue;

    static {
        singleChronicleQueue = SingleChronicleQueueBuilder.single("logqueue").rollCycle(RollCycles.FAST_DAILY).build();
    }


    public static final void pushQueue(String param, String requestIp, Timestamp requestTime,
                                       Timestamp responseTime, String returnCode) {
        ExcerptAppender appender = singleChronicleQueue.acquireAppender();


        try (final DocumentContext document =
                     appender.acquireWritingDocument(false)) {
            document
                    .wire()
                    .bytes()
                    .writeObject(LogData.class,
                            new LogData(param, requestIp, requestTime, responseTime, returnCode) );

        }
        appender.close();
    }

    public static void processQueueAndRemoveProcessed() {
        final ExcerptTailer tailer = singleChronicleQueue.createTailer();
        final long lastReadIndex = tailer.lastReadIndex();
//        tailer.toStart().direction(TailerDirection.FORWARD);
//        tailer.moveToIndex(82038170320897L);

//        why do i need to put 3 ? how can i get all current queue size ?
        for (int i = 0; i < 3; i++) {
            try (DocumentContext document = tailer.readingDocument()) {
                if (document.isPresent()) {
                    long index = tailer.index();
                    final Wire wire = document.wire();
                    final Bytes<?> bytes = wire.bytes();
                    LogData logData = bytes.readObject(LogData.class);

                    System.out.println(index);
                    System.out.println(logData);

//                    how to remove current logData from queue ?
                }
            }
        }
        tailer.close();
    }

}

Simple Test Application

import org.junit.jupiter.api.Test;

import java.sql.Timestamp;

public class TestQueue {
    @Test
    public void pushTest() {
        final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
        ExampleMain.pushQueue("param1","127.0.0.1" , timestamp , timestamp , "200" );
        ExampleMain.pushQueue("param1","192.168.0.2" , timestamp , timestamp , "403" );
        ExampleMain.pushQueue("param1","127.0.0.1" , timestamp , timestamp , "423" );
    }

    @Test
    public void tryToSend() {
        ExampleMain.processQueueAndRemoveProcessed();
    }
}
2

There are 2 best solutions below

2
On

Chronicle Queue does not support removal of entries - it is an immutable, append-only event store.

You can actually archive or remove old data when a cycle file is no longer used - see roll cycles in the chronicle queue doco.

0
On

When making new instance of ExcerptTailer i had to :

final ExcerptTailer tailer = singleChronicleQueue.createTailer("a");

My Mistake was:

final ExcerptTailer tailer = singleChronicleQueue.createTailer( );

Leaving createTailer blank parameters doesnt record indexes! So everytime you restart app , it begins from start index.

Thanks for the info! I have fixed the problem