Exit gracefully if a port is bound by another process

This commit is contained in:
loki 2020-04-26 23:37:47 +02:00
parent 1a233ca4aa
commit e10c9a1fa1
5 changed files with 94 additions and 19 deletions

@ -832,8 +832,18 @@ void start(std::shared_ptr<safe::signal_t> shutdown_event) {
http_server.config.address = "0.0.0.0"s;
http_server.config.port = PORT_HTTP;
std::thread ssl { &https_server_t::start, &https_server };
std::thread tcp { &http_server_t::start, &http_server };
try {
https_server.bind();
http_server.bind();
} catch(boost::system::system_error &err) {
BOOST_LOG(fatal) << "Couldn't bind http server to ports ["sv << PORT_HTTPS << ", "sv << PORT_HTTP << "]: "sv << err.what();
shutdown_event->raise(true);
return;
}
std::thread ssl { &https_server_t::accept_and_run, &https_server };
std::thread tcp { &http_server_t::accept_and_run, &http_server };
// Wait for any event
shutdown_event->view();

@ -54,11 +54,13 @@ public:
}
}
void bind(std::uint16_t port) {
int bind(std::uint16_t port) {
_session_slots.resize(config::stream.channels);
_slot_count = config::stream.channels;
_host = net::host_create(_addr, 1, port);
return !(bool)_host;
}
void session_raise(launch_session_t launch_session) {
@ -157,16 +159,21 @@ public:
}
}
bool accept(const std::shared_ptr<session_t> &session) {
void clear(std::shared_ptr<session_t> *session_p) {
session_p->reset();
++_slot_count;
}
std::shared_ptr<session_t> *accept(std::shared_ptr<session_t> &session) {
for(auto &slot : _session_slots) {
if(!slot) {
slot = session;
return true;
return &slot;
}
}
return false;
return nullptr;
}
net::host_t::pointer host() const {
@ -412,14 +419,22 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) {
}
auto session = session::alloc(config, launch_session->gcm_key, launch_session->iv);
if(!server->accept(session)) {
auto slot = server->accept(session);
if(!slot) {
BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']';
respond(server->host(), peer, &option, 503, "Service Unavailable", req->sequenceNumber, {});
return;
}
session::start(*session, platf::from_sockaddr((sockaddr*)&peer->address.address));
if(session::start(*session, platf::from_sockaddr((sockaddr*)&peer->address.address))) {
BOOST_LOG(error) << "Failed to start a streaming session"sv;
server->clear(slot);
respond(server->host(), peer, &option, 500, "Internal Server Error", req->sequenceNumber, {});
return;
}
respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {});
}
@ -444,7 +459,13 @@ void rtpThread(std::shared_ptr<safe::signal_t> shutdown_event) {
server.map("PLAY"sv, &cmd_play);
server.bind(RTSP_SETUP_PORT);
if(server.bind(RTSP_SETUP_PORT)) {
BOOST_LOG(fatal) << "Couldn't bind RTSP server to port ["sv << RTSP_SETUP_PORT << "], likely another process already bound to the port"sv;
shutdown_event->raise(true);
return;
}
while(!shutdown_event->peek()) {
server.iterate(std::min(500ms, config::stream.ping_timeout));

@ -99,10 +99,11 @@ static inline void while_starting_do_nothing(std::atomic<session::state_e> &stat
class control_server_t {
public:
control_server_t(control_server_t &&) noexcept = default;
control_server_t &operator=(control_server_t &&) noexcept = default;
int bind(std::uint16_t port) {
_host = net::host_create(_addr, config::stream.channels, port);
explicit control_server_t(std::uint16_t port) : _host { net::host_create(_addr, config::stream.channels, port) } {}
return !(bool)_host;
}
void emplace_addr_to_session(const std::string &addr, session_t &session) {
auto lg = _map_addr_session.lock();
@ -160,9 +161,9 @@ struct broadcast_ctx_t {
asio::io_service io;
udp::socket video_sock { io, udp::endpoint(udp::v4(), VIDEO_STREAM_PORT) };
udp::socket audio_sock { io, udp::endpoint(udp::v4(), AUDIO_STREAM_PORT) };
control_server_t control_server { CONTROL_PORT };
udp::socket video_sock { io };
udp::socket audio_sock { io };
control_server_t control_server;
};
struct session_t {
@ -718,6 +719,41 @@ void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, aud
}
int start_broadcast(broadcast_ctx_t &ctx) {
if(ctx.control_server.bind(CONTROL_PORT)) {
BOOST_LOG(error) << "Couldn't bind Control server to port ["sv << CONTROL_PORT << "], likely another process already bound to the port"sv;
return -1;
}
boost::system::error_code ec;
ctx.video_sock.open(udp::v4(), ec);
if(ec) {
BOOST_LOG(fatal) << "Couldn't open socket for Video server: "sv << ec.message();
return -1;
}
ctx.video_sock.bind(udp::endpoint(udp::v4(), VIDEO_STREAM_PORT), ec);
if(ec) {
BOOST_LOG(fatal) << "Couldn't bind Video server to port ["sv << VIDEO_STREAM_PORT << "]: "sv << ec.message();
return -1;
}
ctx.audio_sock.open(udp::v4(), ec);
if(ec) {
BOOST_LOG(fatal) << "Couldn't open socket for Audio server: "sv << ec.message();
return -1;
}
ctx.audio_sock.bind(udp::endpoint(udp::v4(), AUDIO_STREAM_PORT), ec);
if(ec) {
BOOST_LOG(fatal) << "Couldn't bind Audio server to port ["sv << AUDIO_STREAM_PORT << "]: "sv << ec.message();
return -1;
}
ctx.video_packets = std::make_shared<video::packet_queue_t::element_type>(30);
ctx.audio_packets = std::make_shared<audio::packet_queue_t::element_type>(30);
ctx.message_queue_queue = std::make_shared<message_queue_queue_t::element_type>(30);
@ -861,10 +897,14 @@ void join(session_t &session) {
BOOST_LOG(debug) << "Session ended"sv;
}
void start(session_t &session, const std::string &addr_string) {
int start(session_t &session, const std::string &addr_string) {
session.input = input::alloc();
session.broadcast_ref = broadcast.ref();
if(!session.broadcast_ref) {
return -1;
}
session.broadcast_ref->control_server.emplace_addr_to_session(addr_string, session);
session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout;
@ -873,6 +913,8 @@ void start(session_t &session, const std::string &addr_string) {
session.videoThread = std::thread {videoThread, &session, addr_string};
session.state.store(state_e::RUNNING, std::memory_order_relaxed);
return 0;
}
std::shared_ptr<session_t> alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) {

@ -31,7 +31,7 @@ enum class state_e : int {
};
std::shared_ptr<session_t> alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv);
void start(session_t &session, const std::string &addr_string);
int start(session_t &session, const std::string &addr_string);
void stop(session_t &session);
void join(session_t &session);
state_e state(session_t &session);

@ -311,13 +311,15 @@ public:
[[nodiscard]] ptr_t ref() {
std::lock_guard lg { _lock };
if(!_count++) {
if(!_count) {
new(_object_buf.data()) element_type;
if(_construct(*reinterpret_cast<element_type*>(_object_buf.data()))) {
return ptr_t { nullptr };
}
}
++_count;
return ptr_t { this };
}
private: