I need to add socket support in the backend for real-time chat. Initially, I implemented an automatic reply feature in the API for a mobile app. This feature automatically replies after a user sends a message to another person. Although the queue is working, it's not real-time. Now, I want to add a socket in the backend so the user can receive the automatic message in real time.
use ElephantIO\Client;
use ElephantIO\Engine\SocketIO\Version2X;
class UserChatbotAuto implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $request;
protected $user;
protected $curentUser;
protected $token;
public function __construct(array $request, $token, $curentUser, $user)
{
//
$this->request = $request;
$this->user = $user;
$this->curentUser = $curentUser;
$this->token = $token;
}
public function handle()
{
$user = $this->user;
$curentUser = $this->curentUser;
$request = $this->request;
$token = $this->token;
try {
Log::info('start queue' );
$thread = $user->getThreadWithUser($curentUser);
$option = [
// 'handshake' => [
'auth' => [
'token' => 'Bearer ' . $token,
'threadWith' => $thread->id
]
// ]
];
$yourApiKey = config('services.openai.secret');
$client = OpenAI::client($yourApiKey);
$result = $client->chat()->create([
'model' => 'gpt-4',
'messages' => [
[
"role" => "system",
"content" => "You are a mental health adviser, skilled in giving mental health related advice. Please answer in the language after the word question. No yapping"
],
['role' => 'user', 'content' => "give mental health advice for given question. my name is: " . $curentUser->name . ", only give the advice text don't write anything else. question: " . $request['message']],
],
]);
$content_response = $result->choices[0]->message->content;
Log::info('content_response: ' . $content_response);
$message = Message::create([
'thread_id' => $thread->id,
'user_id' => $user->id,
'body' => $content_response,
]);
$client = new Client(new Version2X('http://19.......11:8001', $option));
$client->initialize();
$client->emit('sendMessageToUser', ['userId' => $user->id, 'message' => $content_response]);
$client->close();
} catch (\Exception $e) {
Log::error($e);
}
Log::info('done queue' );
}
}
This is the socket configuration file named server.js, in the mobile, it still working if a real user chat to a real user and the result is both user chat in real time
require("dotenv").config();
const express = require("express");
const fetch = require("node-fetch");
const app = express();
const server = require("http").createServer(app);
const mysql = require("mysql2");
const baseUrl = "http://1........1:8001";
const io = require("socket.io")(server, {
cors: {
origin: "*",
},
});
// Create a connection pool
const pool = mysql.createPool({
host: process.env.DB_HOST,
user: process.env.DB_USERNAME,
password: process.env.DB_PASSWORD,
database: process.env.DB_DATABASE,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
});
const userConnectionDb = [];
const findConnectionIndexById = (socketId) => {
const index = userConnectionDb.findIndex(
(user) => user.socketId === socketId
);
if (index === -1) {
return null;
}
return index;
};
const findByUserId = (userId) => {
const index = userConnectionDb.findIndex((user) => user.userId === userId);
if (index === -1) {
return null;
}
return index;
};
const getSocketIdByUserId = (userId) => {
const index = userConnectionDb.findIndex((user) => user.userId === userId);
if (index !== -1) {
return userConnectionDb[index].socketId;
} else {
return null;
}
};
const validateUser = async (authToken) => {
try {
let user = null;
//console.lo;
const endPoint = baseUrl + "/api/profile/socket-profile";
const options = {
method: "GET",
headers: {
"Content-Type": "application/json",
Accept: "application/json",
Authorization: authToken,
},
};
const response = await fetch(endPoint, options);
if (!response.ok) {
console.log({ status: response.status });
throw new Error("Network response was not OK");
}
const responseData = await response.json();
const userData = {
userId: responseData.id,
};
user = userData;
return { user, error: null };
} catch (error) {
console.log(error);
return { user: null, error: error };
}
};
const sendMessageToServer = async (senderToken, receiver, message) => {
try {
let user = null;
const endPoint = baseUrl + "/api/message/send-socket-message";
const options = {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json",
Authorization: senderToken,
},
body: JSON.stringify({ user_id: receiver, message }),
};
const response = await fetch(endPoint, options);
if (!response.ok) {
console.log({ status: response.status });
throw new Error("Network response was not OK");
}
const responseData = await response.json();
// console.log("message sent", responseData);
return { data: responseData, error: null };
} catch (error) {
return { data: null, error };
}
};
// Middleware to handle authentication
io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token;
const threadWith = socket.handshake.auth.threadWith;
// Perform authentication logic (e.g., verify the token)
const { user, error } = await validateUser(token);
if (error) throw new Error(error);
if (!user) {
// Authentication failed, reject the connection
return next(new Error({ message: "Authentication failed", code: 401 }));
}
const userIndex = findByUserId(user.userId);
if (userIndex !== null) {
userConnectionDb.splice(userIndex, 1);
}
userConnectionDb.push({
userId: user.userId,
socketId: socket.id,
threadWith: threadWith || null,
token,
});
return next();
} catch (error) {
console.log(error);
return next(new Error({ message: "Server error", code: 500 }));
}
});
io.on("connection", (socket) => {
console.log("New client connected Total users:", userConnectionDb.length);
socket.on("message", (message) => {
console.log("Received message:", message);
});
socket.on("disconnect", () => {
const userIndex = findConnectionIndexById(socket.id);
if (userIndex !== null) {
userConnectionDb.splice(userIndex, 1);
}
console.log("Client disconnected Total users:", userConnectionDb.length);
});
// Handling the client's custom event
socket.on("sendMessageToUser", async (data, callback) => {
const { userId, message } = data;
let callbackData = {
success: true,
online: false,
data: null,
};
const socketId = getSocketIdByUserId(userId);
if (socketId) {
const otherUserIndex = findByUserId(userId);
const currentUserIndex = findConnectionIndexById(socket.id);
if (otherUserIndex === null || currentUserIndex === null) {
callbackData.success = false;
callback(callbackData);
return;
}
const currentUserId = userConnectionDb[currentUserIndex].userId;
const senderToken = userConnectionDb[currentUserIndex].token;
const threadWithId = userConnectionDb[otherUserIndex].threadWith;
// console.log({ threadWithId, currentUserId });
if (threadWithId === currentUserId) {
const { data, error } = await sendMessageToServer(
senderToken,
userId,
message
);
if (error) {
console.log(error);
return;
}
io.to(socketId).emit("customMessage", {
sendBy: currentUserId,
data: data,
});
callbackData.online = true;
callbackData.success = true;
callbackData.data = data;
} else {
// console.log("Use is not on same thread");
}
} else {
// console.log(" no user online with this id ");
}
// Send acknowledgment back to the client
callback(callbackData);
});
});
const port = 8001;
server.listen(port, () => {
console.log(`Server is running on port ${port}`);
});
I tried calling the API to call the function and execute the queue above, but it returned an error:
ElephantIO\Exception\ServerConnectionFailureException: An error occurred while trying to establish a connection to the server in C:\xampp\htdocs\Project\tongle_latest\vendor\wisembly\elephant.io\src\Engine\SocketIO\Version1X.php:187
What can I do? Does anyone have any solution?