Multi Executors Asynchronously to Write and Read Replies from TCP/IP Socket using Java

53 Views Asked by At

This is my first time coding for the socket communication so you might see a beginner level's approach. Please bear with it. I am trying to send and receive data from a device (devices in next step) through socket communication. the device receives commands and replies back in a second. I am using 2 executors asynchronously. continuousCommandScheduler is to send commands after every 20 seconds and the other singleCommandScheduler is to send command after every one minute for limited no of times.

This code is for communication. CommunicationResult is just a class having getters and setters for finalData, outputData,outputLength and error.

import java.io.*;
import java.net.*;
import java.util.concurrent.\*;
import com.fazecast.jSerialComm.SerialPort;
import com.fazecast.jSerialComm.SerialPortDataListener;
import com.fazecast.jSerialComm.SerialPortEvent;
import com.fazecast.jSerialComm.SerialPortIOException;
import com.fazecast.jSerialComm.SerialPortInvalidPortException;
import com.fazecast.jSerialComm.SerialPortTimeoutException;
import javafx.application.Platform; 

public class Communication {
     public static CompletableFuture<CommunicationResult> socketCommunication(Socket socket, String command)
                throws IOException {
            CompletableFuture<CommunicationResult> futureresult = new CompletableFuture<>();
            try {
                OutputStream output = socket.getOutputStream();
                socket.setSoTimeout(2000);
    
                InputStream input = socket.getInputStream();
                BufferedReader read = new BufferedReader(new InputStreamReader(input));
                output.write(command.getBytes());
                output.flush();
    
                StringBuilder buffer = new StringBuilder(); // Buffer for accumulating data
                boolean commandComplete = false; // Flag to indicate completion of a command.
    
                while (true) {
                    int character = read.read();
                    if (character == -1) {
                        futureresult.completeExceptionally(new SocketTimeoutException("Read timeout"));
                        break;
                    }
    
                    char ch = (char) character;
                    // Append new data if the command is not complete or if it's the start of a new command.
                    if (!commandComplete || ch == ':') {
                        buffer.append(ch);
                    }
    
                    // When "\r" is received, mark command as complete and process the data.
                    if (ch == '\r') {
                        String finalData = buffer.toString();
                        buffer.setLength(0);
                        CommunicationResult result = new CommunicationResult();
                        Platform.runLater(() -> {
                            result.setFinalData(finalData);
                            String outputData = Checksum.calculateReplyChecksum(finalData);
                            result.setOutputData(outputData);
                            String outputLength = Integer.toString(finalData.length());
                            result.setOutputLength(outputLength);
                            futureresult.complete(result);
                        });
                        commandComplete = true;
                        buffer.setLength(0); // Clear the buffer for the next command response
                        break;
                    }
                }
                socket.setSoTimeout(5000);
                buffer.setLength(0);
            } catch (SocketTimeoutException | UnknownHostException e) {
                handleSocketException(futureresult, e);
            } catch (Exception e) {
                handleSocketException(futureresult, e);
            }
            return futureresult;
        }
      private static CommunicationResult createErrorResult(String errorMessage) {
            CommunicationResult result = new CommunicationResult();
            result.setError(errorMessage);
            return result;
        }
        private static void handleSocketException(CompletableFuture<CommunicationResult> future, Exception e) {
            CommunicationResult result = createErrorResult(e.getMessage());
            future.complete(result);
            ClassAlertMessages.showErrorDialogue("Error", "Exception occured: " + result.getError());
        }
    }

This is my GUI class which is designed for the user to able to connect with the device and send and receive data.

import java.io.*;
import java.net.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.fazecast.jSerialComm.*;
import javafx.scene.paint.Color;
import javafx.application.Platform;
import javafx.collections.FXCollections;
import javafx.event.EventHandler;
import javafx.geometry.HPos;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Node;
import javafx.scene.Scene;
import javafx.scene.control.*;
import javafx.scene.control.cell.PropertyValueFactory;
import javafx.scene.input.KeyCode;
import javafx.scene.input.KeyEvent;
import javafx.scene.layout.*;
import javafx.scene.text.*;

public class MainConnectionClass {
    //code to design GUI.
    ...
      btnConnect.setOnAction(event -> {
                handleSocketConnection();
            });
       btnSend.setOnAction(event -> {
    
                String inputValue = commandField.getText();
    
                String formattedChecksum = Checksum.calculateUIChecksum(inputValue);
                checksumField.setText(formattedChecksum);
                sendAndGetReply(inputValue).thenAccept(result -> {
                    String finalData = result.getFinalData();
                    String outputData = result.getOutputData();
                    String outputLength = result.getOutputLength();
                    String error = result.getError();
    
                    Platform.runLater(() -> {
                        replyField.appendText(finalData + "\n");
                        outputchecksumField.setText(outputData);
                        lengthField.setText(outputLength);
                        errorLabel.setText(error);
                    });
                });
            });
     public void handleSocketConnection() {
            if (!isConnected) {
                connectSocket();
            } else {
                disconnectSocket();
            }
        }
    private void connectSocket() {
            hostName = hostNameField.getText();
            try {
                port = Integer.parseInt(portField.getText());
                socket = new Socket();
                socket.connect(new InetSocketAddress(hostName, port), 5000);
                statusLabel.setText("Connected");
                statusLabel.setTextFill(Color.GREEN);
                btnConnect.setText("Disconnect");
                isConnected = true;
                btnSend.setDisable(false);
                hostNameField.setDisable(true);
                portField.setDisable(true);
                comportPane.setDisable(true);
                instrumentPane.setDisable(true);
                textPane.setDisable(true);
            } catch (NumberFormatException ex) {
                ClassAlertMessages.showErrorDialogue("Error",
                        "Invalid Hostname or Port. Please Enter a valid values. " + ex.getMessage());
            } catch (IOException ex) {
                ClassAlertMessages.showErrorDialogue("Error", ex.getMessage() + "\nCheck your connection and Try again." );
            }
        }
    
        private void disconnectSocket() {
            if (socket != null && !socket.isClosed()) {
                try {
                    socket.close();
                    isConnected = false;
                    btnConnect.setText("Connect");
                    statusLabel.setText("Not Connected");
                    statusLabel.setTextFill(Color.GRAY);
                    portField.setDisable(false);
                    btnSend.setDisable(true);
                    hostNameField.setDisable(false);
                    comportPane.setDisable(false);
                    instrumentPane.setDisable(false);
                    textPane.setDisable(false);
                } catch (IOException ex) {
                    ClassAlertMessages.showErrorDialogue("Error", "Disconnection Error: " + ex.getMessage());
                }
            }
        }
    public CompletableFuture<CommunicationResult> sendAndGetReply(String inputValue) {
    
            CompletableFuture<CommunicationResult> futureresult = new CompletableFuture<>();
     if (isConnected && !inputValue.isEmpty()) {
                try {
                    if (socket != null && !socket.isClosed()) {
                        String formattedChecksum = Checksum.calculateUIChecksum(inputValue);
                        String wholeCommand = inputValue + formattedChecksum + "\r";
                        return Communication.socketCommunication(socket, wholeCommand);
                    }
                    else{
                        futureresult.completeExceptionally(new IOException("Socket is not open"));
                    }
                } catch (Exception e) {
                    handleCommunicationException(futureresult, e);
                    e.printStackTrace();
                }
            } 
            else {
                futureresult.completeExceptionally(new IOException("Connection Problem"));
            }
            return futureresult;
        }
    }

So I am registering the instruments/devices through my MainConnection GUI class and then from this class, through button clicks I am Running both my executors.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Node;
    
public class StageTables {
      private ScheduledExecutorService singleCommandScheduler = Executors.newScheduledThreadPool(1);
        private ScheduledExecutorService continuousCommandScheduler;
    
    private MainConnectionClass mainApp = new MainConnectionClass();
     public StageTables(MainConnectionClass mainApp) {
            this.mainApp = mainApp;
            continuousCommandScheduler = Executors.newScheduledThreadPool(0);
        }
    btnStartSampling.setOnAction(event -> {
    
                if (!registeredInstruments.isEmpty()) {
                    CompletableFuture<Void> multipleFuture = CompletableFuture.runAsync(() -> sendMultipleCommands());
                    multipleFuture.thenRun(() -> {
                        singleCommandScheduler("10178", 10, 1);
                        System.out.println("Command Function Executed... ");
                    });
                } else {
                    ClassAlertMessages.showWarningMessage(null, "Warning",
                            "Either no instrumenst is registered to start sampling.");
                }
            });
     btnStopSampling.setOnAction(event -> {
    
                if (singleCommandScheduler != null && !registeredInstruments.isEmpty()) {
                    System.out.println(singleCommandScheduler);
                    sendCommands("6");
                    System.out.println("Stop Sampling");
                    shutdownSingleCommandScheduler();
                } else {
                    ClassAlertMessages.showWarningMessage(null, "Warning",
                            "Either no instruments is registered or Sampling is not running");
                }
            });
    
            btnShowTables.setOnAction(event -> {
                createTabs();
                startSendingCommands();
            });
    
     public void startSendingCommands() {
            try {
                continuousCommandScheduler.scheduleAtFixedRate(this::continuousCommandSender, 0, 20, TimeUnit.SECONDS);
            } catch (Exception e) {
                LOGGER.severe("Exception occured while sending command (4): " + e.getMessage());
            }
        }
    
        private void continuousCommandSender() {
            commands = new String[] { "F", "E" };
            int delay = 0;
            for (String command : commands) {
                String inputValue = registeredInstruments.get(0) + command;
                continuousCommandScheduler.schedule(() -> continuousCommandScheduler(inputValue, command), delay,
                        TimeUnit.SECONDS);
                delay += 5;
            }
        }
     private void continuousCommandScheduler(String inputValue, String command) {
            try {
                CompletableFuture<CommunicationResult> commandReplies = mainApp.sendAndGetReply(inputValue);
                System.out.println("\nInput Value (ContSender): " + inputValue);
                commandReplies.whenCompleteAsync((result, throwable) -> {
                    if (throwable != null) {
                        LOGGER.severe("Error occured while sending continuous command: " + throwable.getMessage());
                        throwable.printStackTrace();
                        ClassAlertMessages.showErrorDialogue("Error Occured",
                                "Unable to send command: " + throwable.getMessage());
                    } else {
                        Platform.runLater(() -> {
                            try {
                                finalData = result.getFinalData();
                                if (finalData != null) {
                                    if (Validations.isDataValid(finalData, inputValue.substring(1)) == true) {
                                        if (finalData.length() == 46) {
                                            System.out.println("REPLY for F (ContSender): " + finalData);
                                        } else if (finalData.length() == 26) {
                                            System.out.println("REPLY for E (ContSender): " + finalData);
                                        } else {
                                            System.out.println("Invalid reply (ContSender): " + finalData);
                                            System.out.println("\nNot Valid Data");
                                        }
                                    }
                                } else {
                                    System.out.println("\nNull Data");
                                }
                            } catch (Exception e) {
                                LOGGER.severe(
                                        "Error occured while processing continuous command result: " + e.getMessage());
                                e.printStackTrace();
                            }
                        });
                    }
                });
            } catch (Exception e) {
                LOGGER.severe("Error occured while sending continuous command: " + e.getMessage());
                ClassAlertMessages.showErrorDialogue("Exception Occured", "Unable to send command: " + e.getMessage());
                e.printStackTrace();
            }
        }
       private void sendMultipleCommands() {
            String value = "801A501" + setStorageIntervalValue.getText();
            String[] commands = { value, "701A5", "4" };
            CompletableFuture<Void> previousCommand = CompletableFuture.completedFuture(null);
            for (String command : commands)
                previousCommand = previousCommand.thenRunAsync(() -> {
                    try {
                        sendCommands(command);
                        Thread.sleep(1500);
                    } catch (InterruptedException e) {
                        LOGGER.severe("Interupted while sending command (1): " + e.getMessage());
                        e.printStackTrace();
                    }
                });
        }
      private void singleCommandScheduler(String command, int repeatCount, long intervalTime) {
    
            String inputValue = registeredInstruments.get(0) + command;
            AtomicInteger commandCounter = new AtomicInteger(repeatCount);
            Runnable commandSender = () -> {
                if (commandCounter.get() > 0 && !singleCommandScheduler.isTerminated()) {
                    CompletableFuture<CommunicationResult> commandReplies = mainApp.sendAndGetReply(inputValue);
                    System.out.println("\nInput Value (Repeated Single): " + inputValue);
                    commandReplies.whenComplete((result, throwable) -> {
                        if (throwable != null) {
                            throwable.printStackTrace();
                        } else {
                            try {
                                finalData = result.getFinalData();
                                System.out.println("\nRepeated Data Output: " + finalData);
                                if (finalData != null) {
                                    if (Validations.samplingDataValidation(finalData, inputValue.substring(1)) == true) {
                                        if (finalData.length() == 78) {
                                            System.out.println("Valid Data");
                                            gettingTableValues(finalData);
                                            commandCounter.decrementAndGet();
                                        } else {
                                            System.err.println("Invalid data");
                                        }
                                    }
                                } else {
                                    System.out.println("Null Data");
                                }
                            } catch (Exception e) {
                                LOGGER.severe("Error occured while sending command (2): " + e.getMessage());
                                e.printStackTrace();
                            }
                        }
                    });
                } else {
                    singleCommandScheduler.shutdown();
                    sendCommands("6");
                    System.out.println("Sampling done");
                }
            };
    
            singleCommandScheduler.scheduleAtFixedRate(commandSender, 0, intervalTime, TimeUnit.MINUTES);
        }
    }

This above whole code is working fine as in other class I am using thread to get the values for creating a document but the problem I am encountering is when I run executors.

This happens when there is a read time out exception. I started getting the result of previous command


Input Value (ContSender): !O0265E
REPLY for E (ContSender): :O0265E030001014F00014024

Input Value (ContSender): !O0265F
REPLY for F (ContSender): :O0265F0401000B03E77A95420001002400104D0F0F6E

Input Value (ContSender): !O0265E

Null Data

Input Value (ContSender): !O0265F
REPLY for E (ContSender): :O0265E030001014F00020021

Input Value (ContSender): !O0265E
REPLY for F (ContSender): :O0265F0401000B03E77A95460001002400104D0F0F72

This is the result when both executors and a thread are running.


Input Value (ContSender): !O0265F
REPLY for F (ContSender): :O0265F04010013044479994000018E2400104D0F0F64

Input Value (ContSender): !O0265E
REPLY for E (ContSender): :O0265E040001010100364115

Input Value (Repeated Single): !O026510178

Repeated Data Output: null
Null Data

Input Value (ContSender): !O0265F
Invalid reply (ContSender): :O026510178016C01460200015A00000000000000000161013A01F7015400000000000001635D

Not Valid Data

Input Value (Repeated Single): !O026510178

Repeated Data Output: :O026510178016B013F020A016500000000000000000162013A01F90156000000000000016376
Valid Data

Input Value (ContSender): !O0265F
REPLY for F (ContSender): :O0265F040100160450799A4400018E2400104D0F0F70

Input Value (ContSender): !O0265E

Null Data

Input Value (ContSender): !O0265F
REPLY for E (ContSender): :O0265E04000101010040010C

Input Value (ContSender): !O0265E
REPLY for F (ContSender): :O0265F040100170454799A4000018E2400104D0F0F71

Input Value (ContSender): !O0265F
REPLY for E (ContSender): :O0265E04000101010040210E

Input Value (ContSender): !O0265E
REPLY for F (ContSender): :O0265F040100170454799A4400018E2400104D0F0F75

This is the result I want. Means if any data is miss or null, or whatever, the buffer must be at length 0 before receiving the next command. I am getting this result through comport communication using JSerialComm library. Everything else is same for that. just the communication code is different.


Input Value (ContSender): !O4101F
REPLY for F (ContSender): :O4101F280106F30F1B7A9842400100240040276F0F8C

Counter: 10
Command Function Executed...

SENT: !O4101801A50101

Input Value (Repeated Single): !O410110178
REPLY (sendCommnads): :O4101801A5010199

SENT: !O4101701A5
REPLY (sendCommnads): :O4101701A5010199

Input Value (ContSender): !O4101E
REPLY for E (ContSender): :O4101E27240304FF12074252

SENT: !O41014
REPLY (sendCommnads): :O41014280106F30F1B7A9844400130240040276F0F91

Input Value (Repeated Single): !O410110178

Repeated Data Output: :O4101101780000000000000000000000000000000000000000000000000000000000000000D0
Valid Data

Counter: 9

Input Value (ContSender): !O4101E
REPLY for E (ContSender): :O4101E272403040112084228

Input Value (ContSender): !O4101F

Not Valid Data

Input Value (ContSender): !O4101E
REPLY for E (ContSender): :O4101E282403040112090226

Input Value (ContSender): !O4101F
REPLY for F (ContSender): :O4101F280106F50F6079994240318E240040276F0F9A

Please guide me how can I solve this issue. I tried everything I can find to solve it but I am stuck now. Unable to crack this issue. Thank you.

0

There are 0 best solutions below