Hi i want simple persistent queue for single process. It will record (add to queue) if any sql fails then when sql server awake , it will send all remaining objects from queue back to sql server. If any record from queue will fail again while sending , it wont remove the object from the queue.
I have tried simple application test , that will record some object to queue and read from queue.
For the Test Application i had problems like , I needed to put constant 3 ( TestQueue#pushTest 3 objects pushed to queue ) , but in real life i need all queue size.
...
for (int i = 0; i < 3; i++) {
...
Second problem is on TestQueue#tryToSend . When i first run my test TestQueue#tryToSend It dumps all objects 3 correctly But on second run same test TestQueue#tryToSend i still got 3 dump of object.I supposed to get no result because i thought that when i call tailer.readingDocument() method will delete the results. I want to delete the objects from the queue when the object completes the sql operation successfully or wont delete the object.
But currently i cant delete the object i dont know which method has this capability or i dont know how to flag object process completed or not
Currently when i see no solution to my problem from chronicle-queue i have migrated to https://github.com/square/tape Its not updated long time.If you find solution to my problem i will migrate back to chronicle queue.
My codes as shown below:
With dependencies:
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-queue</artifactId>
<version>5.22.18</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
import model.LogData;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.*;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;
import java.sql.Timestamp;
public class ExampleMain {
private static final SingleChronicleQueue singleChronicleQueue;
static {
singleChronicleQueue = SingleChronicleQueueBuilder.single("logqueue").rollCycle(RollCycles.FAST_DAILY).build();
}
public static final void pushQueue(String param, String requestIp, Timestamp requestTime,
Timestamp responseTime, String returnCode) {
ExcerptAppender appender = singleChronicleQueue.acquireAppender();
try (final DocumentContext document =
appender.acquireWritingDocument(false)) {
document
.wire()
.bytes()
.writeObject(LogData.class,
new LogData(param, requestIp, requestTime, responseTime, returnCode) );
}
appender.close();
}
public static void processQueueAndRemoveProcessed() {
final ExcerptTailer tailer = singleChronicleQueue.createTailer();
final long lastReadIndex = tailer.lastReadIndex();
// tailer.toStart().direction(TailerDirection.FORWARD);
// tailer.moveToIndex(82038170320897L);
// why do i need to put 3 ? how can i get all current queue size ?
for (int i = 0; i < 3; i++) {
try (DocumentContext document = tailer.readingDocument()) {
if (document.isPresent()) {
long index = tailer.index();
final Wire wire = document.wire();
final Bytes<?> bytes = wire.bytes();
LogData logData = bytes.readObject(LogData.class);
System.out.println(index);
System.out.println(logData);
// how to remove current logData from queue ?
}
}
}
tailer.close();
}
}
Simple Test Application
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
public class TestQueue {
@Test
public void pushTest() {
final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
ExampleMain.pushQueue("param1","127.0.0.1" , timestamp , timestamp , "200" );
ExampleMain.pushQueue("param1","192.168.0.2" , timestamp , timestamp , "403" );
ExampleMain.pushQueue("param1","127.0.0.1" , timestamp , timestamp , "423" );
}
@Test
public void tryToSend() {
ExampleMain.processQueueAndRemoveProcessed();
}
}
Chronicle Queue does not support removal of entries - it is an immutable, append-only event store.
You can actually archive or remove old data when a cycle file is no longer used - see roll cycles in the chronicle queue doco.