Skip to content
64 changes: 64 additions & 0 deletions examples/ChunkedResponse.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include "App.h"

/* This example demonstrates a large chunked response streamed with tryWrite. */

#include <cstdint>
#include <memory>
#include <string>

namespace {

const std::string payload(16 * 1024 * 1024, 'x');

struct ResponseState {
uintmax_t baseOffset = 0;
bool aborted = false;
};

template <bool SSL>
bool tryWriteLoop(uWS::HttpResponse<SSL> *res, ResponseState *state) {
if (state->aborted) {
return true;
}

uintmax_t sent = res->getWriteOffset() - state->baseOffset;
std::string_view remaining = payload;
remaining.remove_prefix((size_t) sent);
if (res->tryWrite(remaining)) {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
res->end();
return true;
}

return false;
}

}

int main() {

uWS::SSLApp({
.key_file_name = "misc/key.pem",
.cert_file_name = "misc/cert.pem",
.passphrase = "1234"
}).get("/*", [](auto *res, auto */*req*/) {
auto state = std::make_shared<ResponseState>();
state->baseOffset = res->getWriteOffset();
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated

res->writeHeader("Content-Type", "application/octet-stream");
res->onAborted([state]() {
state->aborted = true;
});

if (!tryWriteLoop(res, state.get())) {
res->onWritable([res, state](uintmax_t) {
return tryWriteLoop(res, state.get());
});
}
}).listen(3000, [](auto *listen_socket) {
if (listen_socket) {
std::cout << "Listening on port " << 3000 << std::endl;
}
}).run();

std::cout << "Failed to listen on port 3000" << std::endl;
}
144 changes: 86 additions & 58 deletions src/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

#include "MoveOnlyFunction.h"

/* todo: tryWrite is missing currently, only send smaller segments with write */

namespace uWS {

/* Some pre-defined status constants to use with writeStatus */
Expand Down Expand Up @@ -73,6 +71,15 @@ struct HttpResponse : public AsyncSocket<SSL> {
Super::write(buf, length);
}

unsigned int formatChunkHeader(unsigned int value, char *dst) {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
dst[0] = '\r';
dst[1] = '\n';
int hexLength = utils::u32toaHex(value, dst + 2);
dst[hexLength + 2] = '\r';
dst[hexLength + 3] = '\n';
return (unsigned int) hexLength + 4;
}

/* Called only once per request */
void writeMark() {
/* Date is always written */
Expand All @@ -87,6 +94,72 @@ struct HttpResponse : public AsyncSocket<SSL> {
#endif
}

/* Chunked writes can only be resumed by continuing the same body suffix. */
std::pair<bool, bool> internalWriteChunk(std::string_view data, bool optional, bool terminate = false) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
bool continuingChunk = httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
bool needsUncork = !Super::isCorked() && Super::canCork();
if (needsUncork) {
Super::cork();
}

if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
writeMark();
writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}

bool completed = !continuingChunk || data.length();
bool failed = false;
if (data.length()) {
if (!continuingChunk) {
char chunkHeader[34];
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
unsigned int chunkHeaderLength = formatChunkHeader((unsigned int) data.length(), chunkHeader);
failed = Super::write(chunkHeader, (int) chunkHeaderLength).second;
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
}

auto writtenFailed = Super::write(data.data(), (int) data.length(), optional);
httpResponseData->offset += (uintmax_t) writtenFailed.first;
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
failed = failed || writtenFailed.second;

if (optional && (writtenFailed.first != (int) data.length() || failed)) {
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;
completed = false;
} else {
httpResponseData->state &= ~HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;
}
}

if (terminate && completed) {
Super::write("\r\n0\r\n\r\n", 7);
}

if (needsUncork) {
failed = failed || Super::uncork().second;
}

if (failed || terminate || !completed) {
Super::timeout(HTTP_TIMEOUT_S);
}

if (terminate && completed) {
httpResponseData->markDone();

if (!Super::isCorked()) {
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE) {
if ((httpResponseData->state & HttpResponseData<SSL>::HTTP_RESPONSE_PENDING) == 0) {
if (((AsyncSocket<SSL> *) this)->getBufferedAmount() == 0) {
((AsyncSocket<SSL> *) this)->shutdown();
((AsyncSocket<SSL> *) this)->close();
}
}
}
}
}

return {completed, failed};
}

/* Returns true on success, indicating that it might be feasible to write more data.
* Will start timeout if stream reaches totalSize or write failure. */
bool internalEnd(std::string_view data, uintmax_t totalSize, bool optional, bool allowContentLength = true, bool closeConnection = false) {
Expand Down Expand Up @@ -116,42 +189,7 @@ struct HttpResponse : public AsyncSocket<SSL> {
}

if (httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED) {

/* We do not have tryWrite-like functionalities, so ignore optional in this path */

/* Do not allow sending 0 chunk here */
if (data.length()) {
Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);

/* Ignoring optional for now */
Super::write(data.data(), (int) data.length());
}

/* Terminating 0 chunk */
Super::write("\r\n0\r\n\r\n", 7);

httpResponseData->markDone();

/* We need to check if we should close this socket here now */
if (!Super::isCorked()) {
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE) {
if ((httpResponseData->state & HttpResponseData<SSL>::HTTP_RESPONSE_PENDING) == 0) {
if (((AsyncSocket<SSL> *) this)->getBufferedAmount() == 0) {
((AsyncSocket<SSL> *) this)->shutdown();
/* We need to force close after sending FIN since we want to hinder
* clients from keeping to send their huge data */
((AsyncSocket<SSL> *) this)->close();
return true;
}
}
}
}

/* tryEnd can never fail when in chunked mode, since we do not have tryWrite (yet), only write */
Super::timeout(HTTP_TIMEOUT_S);
return true;
return internalWriteChunk(data, false, true).first;
} else {
/* Write content-length on first call */
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_END_CALLED)) {
Expand Down Expand Up @@ -452,33 +490,23 @@ struct HttpResponse : public AsyncSocket<SSL> {
bool write(std::string_view data) {
writeStatus(HTTP_200_OK);

/* Do not allow sending 0 chunks, they mark end of response */
if (!data.length()) {
/* If you called us, then according to you it was fine to call us so it's fine to still call us */
return true;
return !(getHttpResponseData()->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING);
}

HttpResponseData<SSL> *httpResponseData = getHttpResponseData();

if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
/* Write mark on first call to write */
writeMark();

writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
return !internalWriteChunk(data, false).second;
}

Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);
/* Try and write one chunk. Continue with the remaining body suffix on onWritable. */
bool tryWrite(std::string_view data) {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
writeStatus(HTTP_200_OK);

auto [written, failed] = Super::write(data.data(), (int) data.length());
if (failed) {
Super::timeout(HTTP_TIMEOUT_S);
if (!data.length()) {
return !(getHttpResponseData()->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING);
}

/* If we did not fail the write, accept more */
return !failed;
auto [completed, failed] = internalWriteChunk(data, true);
return completed && !failed;
}

/* Get the current byte write offset for this Http response */
Expand Down
3 changes: 2 additions & 1 deletion src/HttpResponseData.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
HTTP_WRITE_CALLED = 2, // used
HTTP_END_CALLED = 4, // used
HTTP_RESPONSE_PENDING = 8, // used
HTTP_CONNECTION_CLOSE = 16 // used
HTTP_CONNECTION_CLOSE = 16, // used
HTTP_WRITE_CONTINUATION_PENDING = 32 // used
};

/* Per socket event handlers */
Expand Down
Loading