How to unit test sarama kafka in Go?

239 Views Asked by At

I'm trying to unit test kafka client in my library messaging project but got timeout error. I've copy everything from the sarama library but still got the same timeout error. I don't know why... can someone explain it to me? I'm using version 1.38.0

Here's my unit test

func TestSimpleClient(t *testing.T) {
    seedBroker := sarama.NewMockBroker(t, 1)

    seedBroker.Returns(new(sarama.MetadataResponse))

    client, err := NewClient([]string{seedBroker.Addr()}, WithSaramaConfig(mocks.NewTestConfig()))
    if err != nil {
        t.Fatal(err)
    }

    seedBroker.Close()
    safeClose(t, client)
}

Here's what i copy from the sarama library.

func TestSimpleClient(t *testing.T) {
    seedBroker := NewMockBroker(t, 1)

    seedBroker.Returns(new(MetadataResponse))

    client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
    if err != nil {
        t.Fatal(err)
    }

    seedBroker.Close()
    safeClose(t, client)
}

But when i change to this implementation. The unit test is working for me

func initSimpleBroker(t *testing.T) *sarama.MockBroker {
    topics := []string{"test.topic"}
    mockBroker := sarama.NewMockBroker(t, 0)

    mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
        "MetadataRequest": sarama.NewMockMetadataResponse(t).
            SetBroker(mockBroker.Addr(), mockBroker.BrokerID()).
            SetLeader(topics[0], 0, mockBroker.BrokerID()).
            SetController(mockBroker.BrokerID()),
        "ApiVersionsRequest": sarama.NewMockApiVersionsResponse(t),
        "OffsetRequest": sarama.NewMockOffsetResponse(t).
            SetOffset(topics[0], 0, sarama.OffsetOldest, 0).
            SetOffset(topics[0], 0, sarama.OffsetNewest, 1),
    })
    return mockBroker
}

func TestNewClient_Success(t *testing.T) {

    mockBroker := initSimpleBroker(t)
    config := mocks.NewTestConfig()

    // Call NewClient with the mock client
    kc, err := NewClient([]string{mockBroker.Addr()}, WithSaramaConfig(config))

    // Assert that the client was created successfully
    assert.NoError(t, err)
    assert.NotNil(t, kc)

    // Closing
    mockBroker.Close()
    safeClose(t, kc)

    // Assert
    assert.Nil(t, kc.client)
}

I've been trying to unit test my messaging library project that use kafka. But i'm confuse when i got the timeout error.

0

There are 0 best solutions below