I'm trying to modify the functionality in my teacher's module for saving tweets from Twitter's Streaming API. The problem is that if I keep the stream open for more than a minute, the API starts sending blank lines to the stream in order to verify its connection.
From what I've understood, the tweets are broken down and then reassembled, but this becomes a problem with the last function in the code below, pop_size.
Pop_size expects the argument to start with a text-representation of a number. So when the blank lines are sent, represented by <<"\r\n">>, the stream crashes with this message:
Error in process <0.118.0> with exit value: {function_clause,
[{twitterminer_source,pop_size,[<<2 bytes>>],[{file,"src/twitterminer_source.erl"}
If I add the line below, does anyone have any suggestions what I should try executing in it?
pop_size(<<"\r\n">>) -> %%Code here!
To clarify: I want the blank line to be disregarded and the loop to continue checking for the next tweets. I'm quite over my head here, but I'll try to answer any follow-up questions as elaborate as I can.
Code: (Three functions)
% Get HTTP chunks and reassemble them into chunks that we get
% as a result of specifying delimited=length.
% https://dev.twitter.com/streaming/overview/processing
split_loop(Sink, Sender, Buffer) ->
case pop_size(Buffer) of
{size, N, Rest} ->
case buffer_pop_n(Rest, N, Sender) of
{pop, Chunk, NewBuf} ->
Sink ! {message, Chunk},
receive next -> ok end,
split_loop(Sink, Sender, NewBuf);
{incomplete, Chunk} -> Sink ! {error, {incomplete, Chunk}};
{terminate, _Chunk} -> Sink ! terminate;
{error, Reason, Chunk} -> Sink ! {error, {Reason, Chunk}}
end;
{more, L} ->
case buffer_pop_n(Buffer, L, Sender) of
{pop, Chunk, NewBuf} ->
split_loop(Sink, Sender, <<Chunk/binary, NewBuf/binary>>);
{incomplete, <<>>} -> Sink ! finished;
{incomplete, Chunk} -> Sink ! {error, {incomplete, Chunk}};
{terminate, _Chunk} -> Sink ! terminate;
{error, Reason, Chunk} -> Sink ! {error, {Reason, Chunk}}
end
end.
% Get a chunk of N bytes from the buffer. If there is not enough data
% in the buffer, get more messages from the pipeline.
buffer_pop_n(B, N, Sender) ->
if
byte_size(B) < N ->
Sender ! next,
receive
{message, Part} ->
Part2 = Part,
buffer_pop_n(<<B/binary, Part2/binary>>, N, Sender);
finished -> {incomplete, B};
terminate -> {terminate, B};
{error, Reason} -> {error, Reason, B}
end;
true -> {pop, binary:part(B, {0, N}), binary:part(B, {N, byte_size(B)-N})}
end.
% We should also support discarding \r\n here
% (see 'blank lines' in https://dev.twitter.com/streaming/overview/messages-types)
pop_size(<<>>) -> {more, 1};
pop_size(<<A,Rest/binary>>) when A >= $0, A =< $9 ->
pop_size((A - $0), 1, Rest);
pop_size(_N, L, <<>>) -> {more, L+1};
pop_size(_N, L, <<"\r">>) -> {more, L+2};
pop_size(N, L, <<A,Rest/binary>>) when A >= $0, A =< $9 ->
pop_size(N * 10 + (A - $0), L+1, Rest);
pop_size(N, _L, <<"\r\n",Rest/binary>>) -> {size, N, Rest}.