Jedis related: how could the sub thread in the main thread had stopped?

317 Views Asked by At

I'll give some context information and hope you could get any idea on how could this issue happen.

Firstly, the main thread code for whole app is attched here.

public static void main(String args[]) throws Exception {
    AppConfig appConfig = AppConfig.getInstance();
    appConfig.initBean("applicationContext.xml");
    SchedulerFactory factory=new StdSchedulerFactory();
    Scheduler _scheduler=factory.getScheduler();
    _scheduler.start();
    Thread t = new Thread((Runnable) appConfig.getBean("consumeGpzjDataLoopTask"));
    t.start();
}

Main method just does 3 things: inits beans by the Spring way, starts the Quartz jobs thread and starts the sub thread which subscribes one channel in Jedis and listen for msgs continuously. Then I'll show the code for the sub thread which starts subscribing:

@Override
public void run() {
    Properties pros = new Properties();
    Jedis sub = new Jedis(server, defaultPort, 0);
    sub.subscribe(subscriber, channelId);
}

and the thread stack then message received:

enter image description here

But something weird happened in production environment. The quartz jobs scheduler is running properly while the consumeGpzjDataLoopTask seems to be exited somehow! I really can't get an idea why the issue could even happen, as you could see, the sub thread inits one Jedis instance with timeout 0 which stands for running infinitely, so I thought the sub thread should not be closed unless some terrible issues in main thread occured. But in prod environment, the message publisher published messages normally and the messages disappeared, and no related could be found in log file, just like the subscriber thread already been dead. BTW, I never met the situation when self testing in local machine.

Could you help me on the possibility for the issue? Comment me if any extra info needed for problem analyzing. Thanks.

Edited: For @code, here's the code for subscriber.

public class GpzjDataSubscriber extends JedisPubSub {
private static final Logger logger = LoggerFactory.getLogger(GpzjDataSubscriber.class);
private static final String META_INSERT_SQL = "insert into dbo.t_cl_tj_transaction_meta_attributes\n" +
        "(transaction_id, meta_key, meta_value) VALUES (%d, '%s', '%s')";
private static final String GET_EVENT_ID_SQL = "select id from t_cl_tj_monthly_golden_events_dict where target = ?";
private static final String TRANSACTION_TB_NAME = "t_cl_tj_monthly_golden_stock_transactions";
private static Map<String, Object> insertParams = new HashMap<String, Object>();
private static Collection<String> metaSqlContainer = new ArrayList<String>();
@Autowired(required = false)
@Qualifier(value = "gpzjDao")
private GPZJDao gpzjDao;

public GpzjDataSubscriber() {}

public void onMessage(String channel, String message) {
    consumeTransactionMessage(message);
    logger.info(String.format("gpzj data subscriber receives redis published message, channel %s, message %s", channel, message));
}

public void onSubscribe(String channel, int subscribedChannels) {
    logger.info(String.format("gpzj data subscriber subscribes redis channel success, channel %s, subscribedChannels %d",
            channel, subscribedChannels));
}

@Transactional(isolation = Isolation.READ_COMMITTED)
private void consumeTransactionMessage(String msg) {
    final GpzjDataTransactionOrm jsonOrm = JSON.parseObject(msg, GpzjDataTransactionOrm.class);
    Map<String, String> extendedAttrs = (jsonOrm.getAttr() == null || jsonOrm.getAttr().isEmpty())? null : JSON.parseObject(jsonOrm.getAttr(), HashMap.class);

    if (jsonOrm != null) {
        SimpleJdbcInsert insertActor = gpzjDao.getInsertActor(TRANSACTION_TB_NAME);
        initInsertParams(jsonOrm);
        Long transactionId = insertActor.executeAndReturnKey(insertParams).longValue();

        if (extendedAttrs == null || extendedAttrs.isEmpty()) {
            return;
        }

        metaSqlContainer.clear();
        for (Map.Entry e: extendedAttrs.entrySet()) {
            metaSqlContainer.add(String.format(META_INSERT_SQL, transactionId.intValue(), e.getKey(), e.getValue()));
        }
        int[] insertMetaResult = gpzjDao.batchUpdate(metaSqlContainer.toArray(new String[0]));
    }
}

private void initInsertParams(GpzjDataTransactionOrm orm) {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Integer eventId = gpzjDao.queryForInt(GET_EVENT_ID_SQL, orm.getTarget());

    insertParams.clear();
    insertParams.put("khid", orm.getKhid());
    insertParams.put("attr", orm.getAttr());
    insertParams.put("event_id", eventId);
    insertParams.put("user_agent", orm.getUser_agent());
    insertParams.put("referrer", orm.getReferrer());
    insertParams.put("page_url", orm.getPage_url());
    insertParams.put("channel", orm.getChannel());
    insertParams.put("os", orm.getOs());
    insertParams.put("screen_width", orm.getScreen_width());
    insertParams.put("screen_height", orm.getScreen_height());
    insertParams.put("note", orm.getNote());
    insertParams.put("create_time", df.format(new Date()));
    insertParams.put("already_handled", 0);
}

}

0

There are 0 best solutions below