Skip to content
117 changes: 116 additions & 1 deletion examples/Crc32.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,75 @@
#include <sstream>
#include <cstdint>
#include <cstddef>
#include <memory>
#include <string>
#include <sys/socket.h>

#include "../uSockets/src/libusockets.h"

namespace {

const std::string writeChunk(1024, 'a');
const std::string tryWritePayload(128 * 1024, 'x');
const std::string tryWriteEndPayload(256 * 1024, 'y');

struct WriteState {
int remaining = 128;
bool aborted = false;
};

struct TryWriteState {
const std::string *payload = nullptr;
uintmax_t baseOffset = 0;
bool aborted = false;
};

template <bool SSL>
void setSmallSendBuffer(uWS::HttpResponse<SSL> *res) {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
#ifdef LIBUS_NO_SSL
int fd = (int) (uintptr_t) us_socket_get_native_handle(SSL, (us_socket_t *) res);
int sendBuffer = 4096;
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendBuffer, sizeof(sendBuffer));
#else
(void) res;
#endif
}

template <bool SSL>
void writeLoop(uWS::HttpResponse<SSL> *res, const std::shared_ptr<WriteState> &state) {
if (state->aborted) {
return;
}

if (!state->remaining) {
res->end();
return;
}

state->remaining--;
res->write(writeChunk);
uWS::Loop::get()->defer([res, state]() {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
writeLoop(res, state);
});
}

template <bool SSL>
bool tryWriteLoop(uWS::HttpResponse<SSL> *res, const std::shared_ptr<TryWriteState> &state) {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
if (state->aborted) {
return true;
}

uintmax_t sent = res->getWriteOffset() - state->baseOffset;
std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent);
if (res->tryWrite(remaining)) {
res->end();
return true;
}

return false;
}

}

uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) {

Expand All @@ -31,11 +100,57 @@ uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) {
}

int main() {

uWS::SSLApp({
.key_file_name = "misc/key.pem",
.cert_file_name = "misc/cert.pem",
.passphrase = "1234"
}).get("/write", [](auto *res, auto */*req*/) {
auto state = std::make_shared<WriteState>();

res->onAborted([state]() {
state->aborted = true;
});

uWS::Loop::get()->defer([res, state]() {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
writeLoop(res, state);
});
}).get("/trywrite", [](auto *res, auto */*req*/) {
setSmallSendBuffer(res);

auto state = std::make_shared<TryWriteState>();
state->payload = &tryWritePayload;
state->baseOffset = res->getWriteOffset();

res->onAborted([state]() {
state->aborted = true;
});

if (!tryWriteLoop(res, state)) {
res->onWritable([res, state](uintmax_t) {
return tryWriteLoop(res, state);
});
}
}).get("/trywrite-end", [](auto *res, auto */*req*/) {
setSmallSendBuffer(res);

auto state = std::make_shared<TryWriteState>();
state->payload = &tryWriteEndPayload;
state->baseOffset = res->getWriteOffset();

res->onAborted([state]() {
state->aborted = true;
});

if (!res->tryWrite(*state->payload)) {
res->onWritable([res, state](uintmax_t offset) {
uintmax_t sent = offset - state->baseOffset;
std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent);
res->end(remaining);
return true;
});
} else {
res->end();
}
}).post("/*", [](auto *res, auto *req) {

/* Display the headers */
Expand Down
144 changes: 86 additions & 58 deletions src/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

#include "MoveOnlyFunction.h"

/* todo: tryWrite is missing currently, only send smaller segments with write */

namespace uWS {

/* Some pre-defined status constants to use with writeStatus */
Expand Down Expand Up @@ -73,6 +71,15 @@ struct HttpResponse : public AsyncSocket<SSL> {
Super::write(buf, length);
}

unsigned int formatChunkHeader(unsigned int value, char *dst) {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
dst[0] = '\r';
dst[1] = '\n';
int hexLength = utils::u32toaHex(value, dst + 2);
dst[hexLength + 2] = '\r';
dst[hexLength + 3] = '\n';
return (unsigned int) hexLength + 4;
}

/* Called only once per request */
void writeMark() {
/* Date is always written */
Expand All @@ -87,6 +94,72 @@ struct HttpResponse : public AsyncSocket<SSL> {
#endif
}

/* Chunked writes can only be resumed by continuing the same body suffix. */
std::pair<bool, bool> internalWriteChunk(std::string_view data, bool optional, bool terminate = false) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
bool continuingChunk = httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
bool needsUncork = !Super::isCorked() && Super::canCork();
if (needsUncork) {
Super::cork();
}

if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
writeMark();
writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}

bool completed = !continuingChunk || data.length();
bool failed = false;
if (data.length()) {
if (!continuingChunk) {
char chunkHeader[34];
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
unsigned int chunkHeaderLength = formatChunkHeader((unsigned int) data.length(), chunkHeader);
failed = Super::write(chunkHeader, (int) chunkHeaderLength).second;
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
}

auto writtenFailed = Super::write(data.data(), (int) data.length(), optional);
httpResponseData->offset += (uintmax_t) writtenFailed.first;
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
failed = failed || writtenFailed.second;

if (optional && (writtenFailed.first != (int) data.length() || failed)) {
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;
completed = false;
} else {
httpResponseData->state &= ~HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;
}
}

if (terminate && completed) {
Super::write("\r\n0\r\n\r\n", 7);
}

if (needsUncork) {
failed = failed || Super::uncork().second;
}

if (failed || terminate || !completed) {
Super::timeout(HTTP_TIMEOUT_S);
}

if (terminate && completed) {
httpResponseData->markDone();

if (!Super::isCorked()) {
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE) {
if ((httpResponseData->state & HttpResponseData<SSL>::HTTP_RESPONSE_PENDING) == 0) {
if (((AsyncSocket<SSL> *) this)->getBufferedAmount() == 0) {
((AsyncSocket<SSL> *) this)->shutdown();
((AsyncSocket<SSL> *) this)->close();
}
}
}
}
}

return {completed, failed};
}

/* Returns true on success, indicating that it might be feasible to write more data.
* Will start timeout if stream reaches totalSize or write failure. */
bool internalEnd(std::string_view data, uintmax_t totalSize, bool optional, bool allowContentLength = true, bool closeConnection = false) {
Expand Down Expand Up @@ -116,42 +189,7 @@ struct HttpResponse : public AsyncSocket<SSL> {
}

if (httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED) {

/* We do not have tryWrite-like functionalities, so ignore optional in this path */

/* Do not allow sending 0 chunk here */
if (data.length()) {
Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);

/* Ignoring optional for now */
Super::write(data.data(), (int) data.length());
}

/* Terminating 0 chunk */
Super::write("\r\n0\r\n\r\n", 7);

httpResponseData->markDone();

/* We need to check if we should close this socket here now */
if (!Super::isCorked()) {
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE) {
if ((httpResponseData->state & HttpResponseData<SSL>::HTTP_RESPONSE_PENDING) == 0) {
if (((AsyncSocket<SSL> *) this)->getBufferedAmount() == 0) {
((AsyncSocket<SSL> *) this)->shutdown();
/* We need to force close after sending FIN since we want to hinder
* clients from keeping to send their huge data */
((AsyncSocket<SSL> *) this)->close();
return true;
}
}
}
}

/* tryEnd can never fail when in chunked mode, since we do not have tryWrite (yet), only write */
Super::timeout(HTTP_TIMEOUT_S);
return true;
return internalWriteChunk(data, false, true).first;
} else {
/* Write content-length on first call */
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_END_CALLED)) {
Expand Down Expand Up @@ -452,33 +490,23 @@ struct HttpResponse : public AsyncSocket<SSL> {
bool write(std::string_view data) {
writeStatus(HTTP_200_OK);

/* Do not allow sending 0 chunks, they mark end of response */
if (!data.length()) {
/* If you called us, then according to you it was fine to call us so it's fine to still call us */
return true;
return !(getHttpResponseData()->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING);
}

HttpResponseData<SSL> *httpResponseData = getHttpResponseData();

if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
/* Write mark on first call to write */
writeMark();

writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
return !internalWriteChunk(data, false).second;
}

Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);
/* Try and write one chunk. Continue with the remaining body suffix on onWritable. */
bool tryWrite(std::string_view data) {
Comment thread
GetThatCookie marked this conversation as resolved.
Outdated
writeStatus(HTTP_200_OK);

auto [written, failed] = Super::write(data.data(), (int) data.length());
if (failed) {
Super::timeout(HTTP_TIMEOUT_S);
if (!data.length()) {
return !(getHttpResponseData()->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING);
}

/* If we did not fail the write, accept more */
return !failed;
auto [completed, failed] = internalWriteChunk(data, true);
return completed && !failed;
}

/* Get the current byte write offset for this Http response */
Expand Down
3 changes: 2 additions & 1 deletion src/HttpResponseData.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
HTTP_WRITE_CALLED = 2, // used
HTTP_END_CALLED = 4, // used
HTTP_RESPONSE_PENDING = 8, // used
HTTP_CONNECTION_CLOSE = 16 // used
HTTP_CONNECTION_CLOSE = 16, // used
HTTP_WRITE_CONTINUATION_PENDING = 32 // used
};

/* Per socket event handlers */
Expand Down
Loading
Loading