28 #ifndef WEBSOCKETPP_CONNECTION_IMPL_HPP
29 #define WEBSOCKETPP_CONNECTION_IMPL_HPP
31 #include <websocketpp/processors/hybi00.hpp>
32 #include <websocketpp/processors/hybi07.hpp>
33 #include <websocketpp/processors/hybi08.hpp>
34 #include <websocketpp/processors/hybi13.hpp>
36 #include <websocketpp/processors/processor.hpp>
38 #include <websocketpp/common/platforms.hpp>
39 #include <websocketpp/common/system_error.hpp>
50 namespace istate = session::internal_state;
52 template <
typename config>
54 termination_handler new_handler)
57 "connection set_termination_handler");
61 m_termination_handler = new_handler;
64 template <
typename config>
67 return m_processor->get_origin(m_request);
70 template <
typename config>
73 return m_send_buffer_size;
76 template <
typename config>
82 template <
typename config>
86 message_ptr msg = m_msg_manager->get_message(op,payload.size());
87 msg->append_payload(payload);
88 msg->set_compressed(
true);
93 template <
typename config>
97 message_ptr msg = m_msg_manager->get_message(op,len);
98 msg->append_payload(payload,len);
103 template <
typename config>
104 lib::error_code
connection<config>::
send(
typename config::message_type::ptr msg)
111 scoped_lock_type lock(m_connection_state_lock);
112 if (m_state != session::state::open) {
117 message_ptr outgoing_msg;
118 bool needs_writing =
false;
120 if (msg->get_prepared()) {
123 scoped_lock_type lock(m_write_lock);
124 write_push(outgoing_msg);
125 needs_writing = !m_write_flag && !m_send_queue.empty();
127 outgoing_msg = m_msg_manager->get_message();
133 scoped_lock_type lock(m_write_lock);
134 lib::error_code ec = m_processor->prepare_data_frame(msg,outgoing_msg);
140 write_push(outgoing_msg);
141 needs_writing = !m_write_flag && !m_send_queue.empty();
151 return lib::error_code();
154 template <
typename config>
161 scoped_lock_type lock(m_connection_state_lock);
162 if (m_state != session::state::open) {
163 std::stringstream ss;
164 ss <<
"connection::ping called from invalid state " << m_state;
171 message_ptr msg = m_msg_manager->get_message();
177 ec = m_processor->prepare_ping(payload,msg);
181 if (m_pong_timeout_handler) {
184 m_ping_timer->cancel();
187 if (m_pong_timeout_dur > 0) {
194 lib::placeholders::_1
201 m_elog->write(log
::elevel::warn,
"Warning: a pong_timeout_handler is \
202 set but the transport in use does not support timeouts.");
206 bool needs_writing =
false;
208 scoped_lock_type lock(m_write_lock);
210 needs_writing = !m_write_flag && !m_send_queue.empty();
220 ec = lib::error_code();
223 template<
typename config>
232 template<
typename config>
234 lib::error_code
const & ec)
246 if (m_pong_timeout_handler) {
247 m_pong_timeout_handler(m_connection_hdl,payload);
251 template <
typename config>
258 scoped_lock_type lock(m_connection_state_lock);
259 if (m_state != session::state::open) {
260 std::stringstream ss;
261 ss <<
"connection::pong called from invalid state " << m_state;
268 message_ptr msg = m_msg_manager->get_message();
274 ec = m_processor->prepare_pong(payload,msg);
277 bool needs_writing =
false;
279 scoped_lock_type lock(m_write_lock);
281 needs_writing = !m_write_flag && !m_send_queue.empty();
291 ec = lib::error_code();
294 template<
typename config>
303 template <
typename config>
305 std::string
const & reason, lib::error_code & ec)
312 std::string tr(reason,0,std::min<size_t>(reason.size(),
313 frame::limits::close_reason_size));
315 scoped_lock_type lock(m_connection_state_lock);
317 if (m_state != session::state::open) {
325 template<
typename config>
327 std::string
const & reason)
330 close(code,reason,ec);
340 template <
typename config>
352 template <
typename config>
354 if (m_interrupt_handler) {
355 m_interrupt_handler(m_connection_hdl);
359 template <
typename config>
361 m_alog->write(log
::alevel::devel,
"connection connection::pause_reading");
371 template <
typename config>
373 m_alog->write(log
::alevel::devel,
"connection connection::handle_pause_reading");
377 template <
typename config>
379 m_alog->write(log
::alevel::devel,
"connection connection::resume_reading");
389 template <
typename config>
405 template <
typename config>
408 return m_uri->get_secure();
411 template <
typename config>
414 return m_uri->get_host();
417 template <
typename config>
420 return m_uri->get_resource();
423 template <
typename config>
426 return m_uri->get_port();
429 template <
typename config>
435 template <
typename config>
446 template <
typename config>
448 return m_subprotocol;
451 template <
typename config>
454 return m_requested_subprotocols;
457 template <
typename config>
459 lib::error_code & ec)
467 if (value.empty() || std::find_if(value.begin(),value.end(),
474 m_requested_subprotocols.push_back(value);
477 template <
typename config>
480 this->add_subprotocol(value,ec);
487 template <
typename config>
489 lib::error_code & ec)
497 ec = lib::error_code();
501 std::vector<std::string>::iterator it;
503 it = std::find(m_requested_subprotocols.begin(),
504 m_requested_subprotocols.end(),
507 if (it == m_requested_subprotocols.end()) {
512 m_subprotocol = value;
515 template <
typename config>
518 this->select_subprotocol(value,ec);
525 template <
typename config>
528 return m_request.get_header(key);
531 template <
typename config>
534 return m_request.get_body();
537 template <
typename config>
540 return m_response.get_header(key);
544 template <
typename config>
547 if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
548 throw exception(
"Call to set_status from invalid state",
551 m_response.set_status(code);
555 template <
typename config>
557 std::string
const & msg)
559 if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
560 throw exception(
"Call to set_status from invalid state",
564 m_response.set_status(code,msg);
568 template <
typename config>
570 if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
571 throw exception(
"Call to set_status from invalid state",
575 m_response.set_body(value);
579 template <
typename config>
581 std::string
const & val)
584 if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
586 m_response.append_header(key,val);
588 throw exception(
"Call to append_header from invalid state",
592 if (m_internal_state == istate::USER_INIT) {
594 m_request.append_header(key,val);
596 throw exception(
"Call to append_header from invalid state",
603 template <
typename config>
605 std::string
const & val)
608 if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
610 m_response.replace_header(key,val);
612 throw exception(
"Call to replace_header from invalid state",
616 if (m_internal_state == istate::USER_INIT) {
618 m_request.replace_header(key,val);
620 throw exception(
"Call to replace_header from invalid state",
627 template <
typename config>
631 if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
633 m_response.remove_header(key);
635 throw exception(
"Call to remove_header from invalid state",
639 if (m_internal_state == istate::USER_INIT) {
641 m_request.remove_header(key);
643 throw exception(
"Call to remove_header from invalid state",
660 template <
typename config>
664 if (m_handshake_timer) {
665 m_handshake_timer->cancel();
666 m_handshake_timer.reset();
670 m_http_state = session::http_state::deferred;
672 return lib::error_code();
685 template <
typename config>
688 scoped_lock_type lock(m_connection_state_lock);
689 if (m_http_state != session::http_state::deferred) {
694 m_http_state = session::http_state::body_written;
697 this->write_http_response(lib::error_code());
698 ec = lib::error_code();
701 template <
typename config>
704 this->send_http_response(ec);
715 template <
typename config>
719 if (m_internal_state != istate::USER_INIT) {
725 m_internal_state = istate::TRANSPORT_INIT;
732 &
type::handle_transport_init,
734 lib::placeholders::_1
739 template <
typename config>
740 void connection<config>::handle_transport_init(lib::error_code
const & ec) {
743 lib::error_code ecm = ec;
745 if (m_internal_state != istate::TRANSPORT_INIT) {
747 "handle_transport_init must be called from transport init state");
753 s <<
"handle_transport_init received error: "<< ecm.message();
756 this->terminate(ecm);
762 m_internal_state = istate::READ_HTTP_REQUEST;
763 this->read_handshake(1);
767 m_internal_state = istate::WRITE_HTTP_REQUEST;
768 m_processor = get_processor(config::client_version);
769 this->send_http_request();
773 template <
typename config>
774 void connection<config>::read_handshake(size_t num_bytes) {
777 if (m_open_handshake_timeout_dur > 0) {
779 m_open_handshake_timeout_dur,
781 &
type::handle_open_handshake_timeout,
783 lib::placeholders::_1
788 transport_con_type::async_read_at_least(
791 config::connection_read_buffer_size,
793 &type::handle_read_handshake,
795 lib::placeholders::_1,
796 lib::placeholders::_2
803 template <
typename config>
804 void connection<config>::handle_read_handshake(lib::error_code
const & ec,
805 size_t bytes_transferred)
809 lib::error_code ecm = ec;
812 scoped_lock_type lock(m_connection_state_lock);
814 if (m_state == session::state::connecting) {
815 if (m_internal_state != istate::READ_HTTP_REQUEST) {
818 }
else if (m_state == session::state::closed) {
823 "handle_read_handshake invoked after connection was closed");
834 "got (expected) eof/state error from closed con");
839 this->terminate(ecm);
844 if (bytes_transferred > config::connection_read_buffer_size) {
850 size_t bytes_processed = 0;
852 bytes_processed = m_request.consume(m_buf,bytes_transferred);
856 m_response.set_status(e.m_error_code,e.m_error_msg);
863 if (bytes_processed > bytes_transferred) {
871 s <<
"bytes_transferred: " << bytes_transferred
872 <<
" bytes, bytes processed: " << bytes_processed <<
" bytes";
876 if (m_request.ready()) {
877 lib::error_code processor_ec =
this->initialize_processor();
879 this->write_http_response_error(processor_ec);
883 if (m_processor && m_processor->get_version() == 0) {
886 if (bytes_transferred-bytes_processed >= 8) {
887 m_request.replace_header(
888 "Sec-WebSocket-Key3",
889 std::string(m_buf+bytes_processed,m_buf+bytes_processed+8)
891 bytes_processed += 8;
903 if (!m_request.get_header(
"Sec-WebSocket-Key3").empty()) {
912 std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf);
913 m_buf_cursor = bytes_transferred-bytes_processed;
916 m_internal_state = istate::PROCESS_HTTP_REQUEST;
919 lib::error_code handshake_ec =
this->process_handshake_request();
924 if (!m_is_http || m_http_state == session::http_state::init) {
925 this->write_http_response(handshake_ec);
929 transport_con_type::async_read_at_least(
932 config::connection_read_buffer_size,
934 &type::handle_read_handshake,
936 lib::placeholders::_1,
937 lib::placeholders::_2
948 template <
typename config>
949 void connection<config>::write_http_response_error(lib::error_code
const & ec) {
950 if (m_internal_state != istate::READ_HTTP_REQUEST) {
952 "write_http_response_error called in invalid state");
957 m_internal_state = istate::PROCESS_HTTP_REQUEST;
959 this->write_http_response(ec);
964 template <
typename config>
965 void connection<config>::handle_read_frame(lib::error_code
const & ec,
966 size_t bytes_transferred)
970 lib::error_code ecm = ec;
972 if (!ecm && m_internal_state != istate::PROCESS_CONNECTION) {
980 if (m_state == session::state::closed) {
985 }
else if (m_state == session::state::closing && !m_is_server) {
989 terminate(lib::error_code());
997 if (m_state == session::state::closed) {
999 "handle_read_frame: got invalid istate in closed state");
1012 log_err(echannel,
"handle_read_frame", ecm);
1013 this->terminate(ecm);
1027 std::stringstream s;
1028 s <<
"p = " << p <<
" bytes transferred = " << bytes_transferred;
1032 while (p < bytes_transferred) {
1033 if (m_alog->static_test(log::alevel::devel)) {
1034 std::stringstream s;
1035 s <<
"calling consume with " << bytes_transferred-p <<
" bytes";
1036 m_alog->write(log::alevel::devel,s.str());
1039 lib::error_code consume_ec;
1041 if (m_alog->static_test(log::alevel::devel)) {
1042 std::stringstream s;
1043 s <<
"Processing Bytes: " << utility::to_hex(
reinterpret_cast<uint8_t*>(m_buf)+p,bytes_transferred-p);
1044 m_alog->write(log::alevel::devel,s.str());
1047 p += m_processor->consume(
1048 reinterpret_cast<uint8_t*>(m_buf)+p,
1049 bytes_transferred-p,
1053 if (m_alog->static_test(log::alevel::devel)) {
1054 std::stringstream s;
1055 s <<
"bytes left after consume: " << bytes_transferred-p;
1056 m_alog->write(log::alevel::devel,s.str());
1059 log_err(log::elevel::rerror,
"consume", consume_ec);
1061 if (config::drop_on_protocol_error) {
1062 this->terminate(consume_ec);
1065 lib::error_code close_ec;
1067 processor::error::to_ws(consume_ec),
1068 consume_ec.message(),
1073 log_err(log::elevel::fatal,
"Protocol error close frame ", close_ec);
1074 this->terminate(close_ec);
1081 if (m_processor->ready()) {
1082 if (m_alog->static_test(log::alevel::devel)) {
1083 std::stringstream s;
1084 s <<
"Complete message received. Dispatching";
1085 m_alog->write(log::alevel::devel,s.str());
1088 message_ptr msg = m_processor->get_message();
1091 m_alog->write(log::alevel::devel,
"null message from m_processor");
1092 }
else if (!is_control(msg->get_opcode())) {
1094 if (m_state != session::state::open) {
1095 m_elog->write(log::elevel::warn,
"got non-close frame while closing");
1096 }
else if (m_message_handler) {
1097 m_message_handler(m_connection_hdl, msg);
1100 process_control_frame(msg);
1109 template <
typename config>
1115 transport_con_type::async_read_at_least(
1125 config::connection_read_buffer_size,
1130 template <
typename config>
1135 if (!
processor::is_websocket_handshake(m_request)) {
1136 return lib::error_code();
1139 int version =
processor::get_websocket_version(m_request);
1142 m_alog->write(log
::alevel::devel,
"BAD REQUEST: can't determine version");
1147 m_processor = get_processor(version);
1151 return lib::error_code();
1156 m_alog->write(log
::alevel::devel,
"BAD REQUEST: no processor for version");
1159 std::stringstream ss;
1161 std::vector<
int>::const_iterator it;
1162 for (it = versions_supported.begin(); it != versions_supported.end(); it++)
1168 m_response.replace_header(
"Sec-WebSocket-Version",ss.str());
1172 template <
typename config>
1176 if (!
processor::is_websocket_handshake(m_request)) {
1181 m_uri = processor::get_uri_from_host(
1183 (transport_con_type::is_secure() ?
"https" :
"http")
1186 if (!m_uri->get_valid()) {
1187 m_alog->write(log
::alevel::devel,
"Bad request: failed to parse uri");
1192 if (m_http_handler) {
1194 m_http_handler(m_connection_hdl);
1196 if (m_state == session::state::closed) {
1204 return lib::error_code();
1207 lib::error_code ec = m_processor->validate_handshake(m_request);
1219 std::pair<lib::error_code,std::string> neg_results;
1220 neg_results = m_processor->negotiate_extensions(m_request);
1222 if (neg_results.first == processor::error::make_error_code(processor::error::extension_parse_error)) {
1225 m_elog->write(log::elevel::info,
"Bad request: " + neg_results.first.message());
1227 return neg_results.first;
1228 }
else if (neg_results.first) {
1232 m_elog->write(log::elevel::info,
1233 "Extension negotiation failed: " + neg_results.first.message());
1238 if (neg_results.second.size() > 0) {
1239 m_response.replace_header(
"Sec-WebSocket-Extensions",
1240 neg_results.second);
1245 m_uri = m_processor->get_uri(m_request);
1248 if (!m_uri->get_valid()) {
1249 m_alog->write(log
::alevel::devel,
"Bad request: failed to parse uri");
1255 lib::error_code subp_ec = m_processor->extract_subprotocols(m_request,
1256 m_requested_subprotocols);
1263 if (!m_validate_handler || m_validate_handler(m_connection_hdl)) {
1268 ec = m_processor->process_handshake(m_request,m_subprotocol,m_response);
1271 std::stringstream s;
1272 s <<
"Processing error: " << ec <<
"(" << ec.message() <<
")";
1285 if (m_response.get_status_code() ==
http::
status_code::uninitialized) {
1292 return lib::error_code();
1295 template <
typename config>
1296 void connection<config>::write_http_response(lib::error_code
const & ec) {
1300 m_alog->write(log
::alevel::http,
"An HTTP handler took over the connection.");
1304 if (m_response.get_status_code() ==
http::
status_code::uninitialized) {
1311 m_response.set_version(
"HTTP/1.1");
1314 if (m_response.get_header(
"Server").empty()) {
1315 if (!m_user_agent.empty()) {
1316 m_response.replace_header(
"Server",m_user_agent);
1318 m_response.remove_header(
"Server");
1324 m_handshake_buffer = m_processor->get_raw(m_response);
1327 m_handshake_buffer = m_response.raw();
1331 m_alog->write(log
::alevel::devel,
"Raw Handshake response:\n"+m_handshake_buffer);
1332 if (!m_response.get_header(
"Sec-WebSocket-Key3").empty()) {
1340 m_handshake_buffer.data(),
1341 m_handshake_buffer.size(),
1343 &
type::handle_write_http_response,
1345 lib::placeholders::_1
1350 template <
typename config>
1351 void connection<config>::handle_write_http_response(lib::error_code
const & ec) {
1354 lib::error_code ecm = ec;
1357 scoped_lock_type lock(m_connection_state_lock);
1359 if (m_state == session::state::connecting) {
1360 if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
1363 }
else if (m_state == session::state::closed) {
1368 "handle_write_http_response invoked after connection was closed");
1379 "got (expected) eof/state error from closed con");
1384 this->terminate(ecm);
1388 if (m_handshake_timer) {
1389 m_handshake_timer->cancel();
1390 m_handshake_timer.reset();
1393 if (m_response.get_status_code() !=
http::
status_code::switching_protocols)
1400 std::stringstream s;
1401 s <<
"Handshake ended with HTTP error: "
1402 << m_response.get_status_code();
1408 this->log_http_result();
1412 "got to writing HTTP results with m_ec set: "+m_ec.message());
1417 this->terminate(m_ec);
1421 this->log_open_result();
1423 m_internal_state = istate::PROCESS_CONNECTION;
1424 m_state = session::state::open;
1426 if (m_open_handler) {
1427 m_open_handler(m_connection_hdl);
1430 this->handle_read_frame(lib::error_code(), m_buf_cursor);
1433 template <
typename config>
1434 void connection<config>::send_http_request() {
1443 ec = m_processor->client_handshake_request(m_request,m_uri,
1444 m_requested_subprotocols);
1451 m_elog->write(log
::elevel::fatal,
"Internal library error: missing processor");
1456 if (m_request.get_header(
"User-Agent").empty()) {
1457 if (!m_user_agent.empty()) {
1458 m_request.replace_header(
"User-Agent",m_user_agent);
1460 m_request.remove_header(
"User-Agent");
1464 m_handshake_buffer = m_request.raw();
1467 m_alog->write(log
::alevel::devel,
"Raw Handshake request:\n"+m_handshake_buffer);
1470 if (m_open_handshake_timeout_dur > 0) {
1472 m_open_handshake_timeout_dur,
1474 &
type::handle_open_handshake_timeout,
1476 lib::placeholders::_1
1482 m_handshake_buffer.data(),
1483 m_handshake_buffer.size(),
1485 &
type::handle_send_http_request,
1487 lib::placeholders::_1
1492 template <
typename config>
1493 void connection<config>::handle_send_http_request(lib::error_code
const & ec) {
1496 lib::error_code ecm = ec;
1499 scoped_lock_type lock(m_connection_state_lock);
1501 if (m_state == session::state::connecting) {
1502 if (m_internal_state != istate::WRITE_HTTP_REQUEST) {
1505 m_internal_state = istate::READ_HTTP_RESPONSE;
1507 }
else if (m_state == session::state::closed) {
1512 "handle_send_http_request invoked after connection was closed");
1523 "got (expected) eof/state error from closed con");
1528 this->terminate(ecm);
1532 transport_con_type::async_read_at_least(
1535 config::connection_read_buffer_size,
1537 &type::handle_read_http_response,
1539 lib::placeholders::_1,
1540 lib::placeholders::_2
1545 template <
typename config>
1546 void connection<config>::handle_read_http_response(lib::error_code
const & ec,
1547 size_t bytes_transferred)
1551 lib::error_code ecm = ec;
1554 scoped_lock_type lock(m_connection_state_lock);
1556 if (m_state == session::state::connecting) {
1557 if (m_internal_state != istate::READ_HTTP_RESPONSE) {
1560 }
else if (m_state == session::state::closed) {
1565 "handle_read_http_response invoked after connection was closed");
1576 "got (expected) eof/state error from closed con");
1581 this->terminate(ecm);
1585 size_t bytes_processed = 0;
1588 bytes_processed = m_response.consume(m_buf,bytes_transferred);
1591 std::string(
"error in handle_read_http_response: ")+e.what());
1596 m_alog->write(log
::alevel::devel,std::string(
"Raw response: ")+m_response.raw());
1598 if (m_response.headers_ready()) {
1599 if (m_handshake_timer) {
1600 m_handshake_timer->cancel();
1601 m_handshake_timer.reset();
1604 lib::error_code validate_ec = m_processor->validate_server_handshake_response(
1610 this->terminate(validate_ec);
1616 std::pair<lib::error_code,std::string> neg_results;
1617 neg_results = m_processor->negotiate_extensions(m_response);
1619 if (neg_results.first) {
1627 m_alog->write(log::alevel::devel,
"Extension negotiation failed: "
1628 + neg_results.first.message());
1634 m_internal_state = istate::PROCESS_CONNECTION;
1635 m_state = session::state::open;
1637 this->log_open_result();
1639 if (m_open_handler) {
1640 m_open_handler(m_connection_hdl);
1646 std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf);
1647 m_buf_cursor = bytes_transferred-bytes_processed;
1649 this->handle_read_frame(lib::error_code(), m_buf_cursor);
1651 transport_con_type::async_read_at_least(
1654 config::connection_read_buffer_size,
1656 &type::handle_read_http_response,
1658 lib::placeholders::_1,
1659 lib::placeholders::_2
1665 template <
typename config>
1666 void connection<config>::handle_open_handshake_timeout(
1667 lib::error_code
const & ec)
1673 "open handle_open_handshake_timeout error: "+ec.message());
1681 template <
typename config>
1682 void connection<config>::handle_close_handshake_timeout(
1683 lib::error_code
const & ec)
1686 m_alog->write(log
::alevel::devel,
"asio close handshake timer cancelled");
1689 "asio open handle_close_handshake_timeout error: "+ec.message());
1692 m_alog->write(log
::alevel::devel,
"asio close handshake timer expired");
1697 template <
typename config>
1698 void connection<config>::terminate(lib::error_code
const & ec) {
1704 if (m_handshake_timer) {
1705 m_handshake_timer->cancel();
1706 m_handshake_timer.reset();
1709 terminate_status tstat = unknown;
1713 m_local_close_reason = ec.message();
1718 m_http_state = session::http_state::closed;
1720 if (m_state == session::state::connecting) {
1721 m_state = session::state::closed;
1729 }
else if (m_state != session::state::closed) {
1730 m_state = session::state::closed;
1734 "terminate called on connection that was already terminated");
1742 &
type::handle_terminate,
1745 lib::placeholders::_1
1750 template <
typename config>
1751 void connection<config>::handle_terminate(terminate_status tstat,
1752 lib::error_code
const & ec)
1764 if (tstat == failed) {
1766 if (m_fail_handler) {
1767 m_fail_handler(m_connection_hdl);
1770 }
else if (tstat == closed) {
1771 if (m_close_handler) {
1772 m_close_handler(m_connection_hdl);
1782 if (m_termination_handler) {
1785 }
catch (std::exception
const & e) {
1787 std::string(
"termination_handler call failed. Reason was: ")+e.what());
1792 template <
typename config>
1797 scoped_lock_type lock(m_write_lock);
1809 message_ptr next_message = write_pop();
1810 while (next_message) {
1811 m_current_msgs.push_back(next_message);
1812 if (!next_message->get_terminal()) {
1813 next_message = write_pop();
1815 next_message = message_ptr();
1819 if (m_current_msgs.empty()) {
1826 m_write_flag =
true;
1830 typename std::vector<message_ptr>::iterator it;
1831 for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) {
1832 std::string
const & header = (*it)->get_header();
1833 std::string
const & payload = (*it)->get_payload();
1835 m_send_buffer.push_back(transport::buffer(header.c_str(),header.size()));
1836 m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size()));
1842 std::stringstream general,header,payload;
1844 general <<
"Dispatching write containing " << m_current_msgs.size()
1845 <<
" message(s) containing ";
1846 header <<
"Header Bytes: \n";
1847 payload <<
"Payload Bytes: \n";
1852 for (size_t i = 0; i < m_current_msgs.size(); i++) {
1853 hbytes += m_current_msgs[i]->get_header().size();
1854 pbytes += m_current_msgs[i]->get_payload().size();
1857 header <<
"[" << i <<
"] ("
1858 << m_current_msgs[i]->get_header().size() <<
") "
1859 << utility::to_hex(m_current_msgs[i]->get_header()) <<
"\n";
1861 if (m_alog->static_test(log::alevel::frame_payload)) {
1862 if (m_alog->dynamic_test(log::alevel::frame_payload)) {
1863 payload <<
"[" << i <<
"] ("
1864 << m_current_msgs[i]->get_payload().size() <<
") ["<<m_current_msgs[i]->get_opcode()<<
"] "
1865 << (m_current_msgs[i]->get_opcode() == frame::opcode::text ?
1866 m_current_msgs[i]->get_payload() :
1867 utility::to_hex(m_current_msgs[i]->get_payload())
1874 general << hbytes <<
" header bytes and " << pbytes <<
" payload bytes";
1882 transport_con_type::async_write(
1884 m_write_frame_handler
1888 template <
typename config>
1895 bool terminal = m_current_msgs.back()->get_terminal();
1897 m_send_buffer.clear();
1898 m_current_msgs.clear();
1903 this->terminate(ec);
1908 this->terminate(lib::error_code());
1912 bool needs_writing =
false;
1914 scoped_lock_type lock(m_write_lock);
1917 m_write_flag =
false;
1919 needs_writing = !m_send_queue.empty();
1922 if (needs_writing) {
1930 template <
typename config>
1933 return versions_supported;
1936 template <
typename config>
1937 void connection<config>::process_control_frame(
typename config::message_type::ptr msg)
1944 std::stringstream s;
1945 s <<
"Control frame received with opcode " << op;
1948 if (m_state == session::state::closed) {
1952 if (op !=
frame::
opcode::CLOSE && m_state != session::state::open) {
1953 m_elog->write(log
::elevel::warn,
"got non-close frame in state closing");
1958 bool should_reply =
true;
1960 if (m_ping_handler) {
1961 should_reply = m_ping_handler(m_connection_hdl, msg->get_payload());
1965 this->pong(msg->get_payload(),ec);
1971 if (m_pong_handler) {
1972 m_pong_handler(m_connection_hdl, msg->get_payload());
1975 m_ping_timer->cancel();
1984 if (config::drop_on_protocol_error) {
1985 s <<
"Received invalid close code " << m_remote_close_code
1986 <<
" dropping connection per config.";
1988 this->terminate(ec);
1990 s <<
"Received invalid close code " << m_remote_close_code
1991 <<
" sending acknowledgement and closing";
1994 "Invalid close code");
2004 if (config::drop_on_protocol_error) {
2006 "Received invalid close reason. Dropping connection per config");
2007 this->terminate(ec);
2010 "Received invalid close reason. Sending acknowledgement and closing");
2012 "Invalid close reason");
2020 if (m_state == session::state::open) {
2022 s <<
"Received close frame with code " << m_remote_close_code
2023 <<
" and reason " << m_remote_close_reason;
2026 ec = send_close_ack();
2030 }
else if (m_state == session::state::closing && !m_was_clean) {
2044 terminate(lib::error_code());
2052 m_elog->write(log
::elevel::devel,
"Got control frame with invalid opcode");
2057 template <
typename config>
2059 std::string
const & reason)
2061 return send_close_frame(code,reason,
true,m_is_server);
2064 template <
typename config>
2066 std::string
const & reason,
bool ack,
bool terminal)
2078 if (config::silent_close) {
2081 m_local_close_reason.clear();
2084 m_local_close_code = code;
2085 m_local_close_reason = reason;
2089 m_local_close_reason.clear();
2092 "acknowledging a no-status close with normal code");
2094 m_local_close_reason.clear();
2097 m_local_close_code = m_remote_close_code;
2098 m_local_close_reason = m_remote_close_reason;
2101 std::stringstream s;
2102 s <<
"Closing with code: " << m_local_close_code <<
", and reason: "
2103 << m_local_close_reason;
2106 message_ptr msg = m_msg_manager->get_message();
2111 lib::error_code ec = m_processor->prepare_close(m_local_close_code,
2112 m_local_close_reason,msg);
2121 msg->set_terminal(
true);
2124 m_state = session::state::closing;
2132 if (m_close_handshake_timeout_dur > 0) {
2134 m_close_handshake_timeout_dur,
2136 &
type::handle_close_handshake_timeout,
2138 lib::placeholders::_1
2143 bool needs_writing =
false;
2145 scoped_lock_type lock(m_write_lock);
2147 needs_writing = !m_write_flag && !m_send_queue.empty();
2150 if (needs_writing) {
2157 return lib::error_code();
2160 template <
typename config>
2162 connection<config>::get_processor(
int version)
const {
2204 p->set_max_message_size(m_max_message_size);
2209 template <
typename config>
2210 void connection<config>::write_push(
typename config::message_type::ptr msg)
2216 m_send_buffer_size += msg->get_payload().size();
2217 m_send_queue.push(msg);
2220 std::stringstream s;
2221 s <<
"write_push: message count: " << m_send_queue.size()
2222 <<
" buffer size: " << m_send_buffer_size;
2227 template <
typename config>
2228 typename config::message_type::ptr
connection<config>::write_pop()
2232 if (m_send_queue.empty()) {
2236 msg = m_send_queue.front();
2238 m_send_buffer_size -= msg->get_payload().size();
2242 std::stringstream s;
2243 s <<
"write_pop: message count: " << m_send_queue.size()
2244 <<
" buffer size: " << m_send_buffer_size;
2250 template <
typename config>
2253 std::stringstream s;
2256 if (!
processor::is_websocket_handshake(m_request)) {
2259 version =
processor::get_websocket_version(m_request);
2263 s << (version == -1 ?
"HTTP" :
"WebSocket") <<
" Connection ";
2269 if (version != -1) {
2270 s <<
"v" << version <<
" ";
2274 std::string ua = m_request.get_header(
"User-Agent");
2283 s << (m_uri ? m_uri->get_resource() :
"NULL") <<
" ";
2286 s << m_response.get_status_code();
2291 template <
typename config>
2294 std::stringstream s;
2297 <<
"close local:[" << m_local_close_code
2298 << (m_local_close_reason.empty() ?
"" :
","+m_local_close_reason)
2299 <<
"] remote:[" << m_remote_close_code
2300 << (m_remote_close_reason.empty() ?
"" :
","+m_remote_close_reason) <<
"]";
2305 template <
typename config>
2308 std::stringstream s;
2310 int version =
processor::get_websocket_version(m_request);
2313 s <<
"WebSocket Connection ";
2320 s <<
" v" << version;
2324 std::string ua = m_request.get_header(
"User-Agent");
2333 s << (m_uri ? m_uri->get_resource() :
"-");
2336 s <<
" " << m_response.get_status_code();
2339 s <<
" " << m_ec <<
" " << m_ec.message();
2344 template <
typename config>
2346 std::stringstream s;
2348 if (
processor::is_websocket_handshake(m_request)) {
2349 m_alog->write(log
::alevel::devel,
"Call to log_http_result for WebSocket");
2354 s << (m_request.get_header(
"host").empty() ?
"-" : m_request.get_header(
"host"))
2355 <<
" " << transport_con_type::get_remote_endpoint()
2356 <<
" \"" << m_request.get_method()
2357 <<
" " << (m_uri ? m_uri->get_resource() :
"-")
2358 <<
" " << m_request.get_version() <<
"\" " << m_response.get_status_code()
2359 <<
" " << m_response.get_body().size();
2362 std::string ua = m_request.get_header(
"User-Agent");