Commit f335a349 authored by Rami Jung's avatar Rami Jung Committed by Kishen Maloor

Reduce complexity of network_event_thread()

in order to improve readability and reduce complexity,
network_event_thread uses following subfuncions
- setting UDP/TCP sockets
- receiving UDP/TCP messages

Change-Id: Idd4642ca9d83944e008a130b336378b7a6f38c36
Signed-off-by: default avatarRami Jung <rami.jung@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/27529Tested-by: default avatarIoTivity Jenkins <jenkins-daemon@iotivity.org>
Reviewed-by: Kishen Maloor's avatarKishen Maloor <kishen.maloor@intel.com>
parent c7d17544
...@@ -760,20 +760,9 @@ recv_msg(int sock, uint8_t *recv_buf, int recv_buf_size, ...@@ -760,20 +760,9 @@ recv_msg(int sock, uint8_t *recv_buf, int recv_buf_size,
return ret; return ret;
} }
static void * static void
network_event_thread(void *data) oc_udp_add_socks_to_fd_set(ip_context_t *dev)
{ {
ip_context_t *dev = (ip_context_t *)data;
fd_set setfds;
FD_ZERO(&dev->rfds);
/* Monitor network interface changes on the platform from only the 0th logical
* device
*/
if (dev->device == 0) {
FD_SET(ifchange_sock, &dev->rfds);
}
FD_SET(dev->shutdown_pipe[0], &dev->rfds);
FD_SET(dev->server_sock, &dev->rfds); FD_SET(dev->server_sock, &dev->rfds);
FD_SET(dev->mcast_sock, &dev->rfds); FD_SET(dev->mcast_sock, &dev->rfds);
#ifdef OC_SECURITY #ifdef OC_SECURITY
...@@ -787,7 +776,107 @@ network_event_thread(void *data) ...@@ -787,7 +776,107 @@ network_event_thread(void *data)
FD_SET(dev->secure4_sock, &dev->rfds); FD_SET(dev->secure4_sock, &dev->rfds);
#endif /* OC_SECURITY */ #endif /* OC_SECURITY */
#endif /* OC_IPV4 */ #endif /* OC_IPV4 */
}
static adapter_receive_state_t
oc_udp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
{
if (FD_ISSET(dev->server_sock, fds)) {
int count = recv_msg(dev->server_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6;
FD_CLR(dev->server_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
if (FD_ISSET(dev->mcast_sock, fds)) {
int count = recv_msg(dev->mcast_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6 | MULTICAST;
FD_CLR(dev->mcast_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
#ifdef OC_IPV4
if (FD_ISSET(dev->server4_sock, fds)) {
int count = recv_msg(dev->server4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4;
FD_CLR(dev->server4_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
if (FD_ISSET(dev->mcast4_sock, fds)) {
int count = recv_msg(dev->mcast4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4 | MULTICAST;
FD_CLR(dev->mcast4_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
#endif /* OC_IPV4 */
#ifdef OC_SECURITY
if (FD_ISSET(dev->secure_sock, fds)) {
int count = recv_msg(dev->secure_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6 | SECURED;
FD_CLR(dev->secure_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
#ifdef OC_IPV4
if (FD_ISSET(dev->secure4_sock, fds)) {
int count = recv_msg(dev->secure4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
return ADAPTER_STATUS_ERROR;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4 | SECURED;
FD_CLR(dev->secure4_sock, fds);
return ADAPTER_STATUS_RECEIVE;
}
#endif /* OC_IPV4 */
#endif /* OC_SECURITY */
return ADAPTER_STATUS_NONE;
}
static void *
network_event_thread(void *data)
{
ip_context_t *dev = (ip_context_t *)data;
fd_set setfds;
FD_ZERO(&dev->rfds);
/* Monitor network interface changes on the platform from only the 0th logical
* device
*/
if (dev->device == 0) {
FD_SET(ifchange_sock, &dev->rfds);
}
FD_SET(dev->shutdown_pipe[0], &dev->rfds);
oc_udp_add_socks_to_fd_set(dev);
#ifdef OC_TCP #ifdef OC_TCP
oc_tcp_add_socks_to_fd_set(dev); oc_tcp_add_socks_to_fd_set(dev);
#endif /* OC_TCP */ #endif /* OC_TCP */
...@@ -829,101 +918,20 @@ network_event_thread(void *data) ...@@ -829,101 +918,20 @@ network_event_thread(void *data)
message->endpoint.device = dev->device; message->endpoint.device = dev->device;
if (FD_ISSET(dev->server_sock, &setfds)) { if (oc_udp_receive_message(dev, &setfds, message) ==
int count = recv_msg(dev->server_sock, message->data, OC_PDU_SIZE, ADAPTER_STATUS_RECEIVE) {
&message->endpoint, false);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6;
FD_CLR(dev->server_sock, &setfds);
goto common;
}
if (FD_ISSET(dev->mcast_sock, &setfds)) {
int count = recv_msg(dev->mcast_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6 | MULTICAST;
FD_CLR(dev->mcast_sock, &setfds);
goto common;
}
#ifdef OC_IPV4
if (FD_ISSET(dev->server4_sock, &setfds)) {
int count = recv_msg(dev->server4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4;
FD_CLR(dev->server4_sock, &setfds);
goto common;
}
if (FD_ISSET(dev->mcast4_sock, &setfds)) {
int count = recv_msg(dev->mcast4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4 | MULTICAST;
FD_CLR(dev->mcast4_sock, &setfds);
goto common;
}
#endif /* OC_IPV4 */
#ifdef OC_SECURITY
if (FD_ISSET(dev->secure_sock, &setfds)) {
int count = recv_msg(dev->secure_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV6 | SECURED;
FD_CLR(dev->secure_sock, &setfds);
goto common; goto common;
} }
#ifdef OC_IPV4
if (FD_ISSET(dev->secure4_sock, &setfds)) {
int count = recv_msg(dev->secure4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
oc_message_unref(message);
continue;
}
message->length = (size_t)count;
message->endpoint.flags = IPV4 | SECURED;
FD_CLR(dev->secure4_sock, &setfds);
goto common;
}
#endif /* OC_IPV4 */
#endif /* OC_SECURITY */
#ifdef OC_TCP #ifdef OC_TCP
tcp_receive_state_t tcp_status = oc_tcp_receive_message(dev, if (oc_tcp_receive_message(dev, &setfds, message) ==
&setfds, ADAPTER_STATUS_RECEIVE) {
message);
if (tcp_status == TCP_STATUS_RECEIVE) {
goto common; goto common;
} else {
oc_message_unref(message);
continue;
} }
#endif /* OC_TCP */ #endif /* OC_TCP */
oc_message_unref(message);
continue;
common: common:
#ifdef OC_DEBUG #ifdef OC_DEBUG
PRINT("Incoming message of size %d bytes from ", message->length); PRINT("Incoming message of size %d bytes from ", message->length);
......
...@@ -30,8 +30,16 @@ extern "C" ...@@ -30,8 +30,16 @@ extern "C"
{ {
#endif #endif
typedef enum {
ADAPTER_STATUS_NONE = 0, /* Nothing happens */
ADAPTER_STATUS_ACCEPT, /* Receiving no meaningful data */
ADAPTER_STATUS_RECEIVE, /* Receiving meaningful data */
ADAPTER_STATUS_ERROR /* Error */
} adapter_receive_state_t;
#ifdef OC_TCP #ifdef OC_TCP
typedef struct tcp_context_t { typedef struct tcp_context_t
{
struct sockaddr_storage server; struct sockaddr_storage server;
int server_sock; int server_sock;
uint16_t port; uint16_t port;
......
...@@ -300,7 +300,7 @@ get_total_length_from_header(oc_message_t *message, oc_endpoint_t *endpoint) ...@@ -300,7 +300,7 @@ get_total_length_from_header(oc_message_t *message, oc_endpoint_t *endpoint)
return total_length; return total_length;
} }
tcp_receive_state_t adapter_receive_state_t
oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message) oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
{ {
pthread_mutex_lock(&dev->tcp.mutex); pthread_mutex_lock(&dev->tcp.mutex);
...@@ -309,7 +309,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message) ...@@ -309,7 +309,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
ret = status; \ ret = status; \
goto oc_tcp_receive_message_done goto oc_tcp_receive_message_done
tcp_receive_state_t ret = TCP_STATUS_ERROR; adapter_receive_state_t ret = ADAPTER_STATUS_ERROR;
message->endpoint.device = dev->device; message->endpoint.device = dev->device;
if (FD_ISSET(dev->tcp.server_sock, fds)) { if (FD_ISSET(dev->tcp.server_sock, fds)) {
...@@ -317,18 +317,18 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message) ...@@ -317,18 +317,18 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
if (accept_new_session(dev, dev->tcp.server_sock, fds, &message->endpoint) < if (accept_new_session(dev, dev->tcp.server_sock, fds, &message->endpoint) <
0) { 0) {
OC_ERR("accept new session fail"); OC_ERR("accept new session fail");
ret_with_code(TCP_STATUS_ERROR); ret_with_code(ADAPTER_STATUS_ERROR);
} }
ret_with_code(TCP_STATUS_ACCEPT); ret_with_code(ADAPTER_STATUS_ACCEPT);
#ifdef OC_SECURITY #ifdef OC_SECURITY
} else if (FD_ISSET(dev->tcp.secure_sock, fds)) { } else if (FD_ISSET(dev->tcp.secure_sock, fds)) {
message->endpoint.flags = IPV6 | SECURED | TCP; message->endpoint.flags = IPV6 | SECURED | TCP;
if (accept_new_session(dev, dev->tcp.secure_sock, fds, &message->endpoint) < if (accept_new_session(dev, dev->tcp.secure_sock, fds, &message->endpoint) <
0) { 0) {
OC_ERR("accept new session fail"); OC_ERR("accept new session fail");
ret_with_code(TCP_STATUS_ERROR); ret_with_code(ADAPTER_STATUS_ERROR);
} }
ret_with_code(TCP_STATUS_ACCEPT); ret_with_code(ADAPTER_STATUS_ACCEPT);
#endif /* OC_SECURITY */ #endif /* OC_SECURITY */
#ifdef OC_IPV4 #ifdef OC_IPV4
} else if (FD_ISSET(dev->tcp.server4_sock, fds)) { } else if (FD_ISSET(dev->tcp.server4_sock, fds)) {
...@@ -336,35 +336,35 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message) ...@@ -336,35 +336,35 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
if (accept_new_session(dev, dev->tcp.server4_sock, fds, if (accept_new_session(dev, dev->tcp.server4_sock, fds,
&message->endpoint) < 0) { &message->endpoint) < 0) {
OC_ERR("accept new session fail"); OC_ERR("accept new session fail");
ret_with_code(TCP_STATUS_ERROR); ret_with_code(ADAPTER_STATUS_ERROR);
} }
ret_with_code(TCP_STATUS_ACCEPT); ret_with_code(ADAPTER_STATUS_ACCEPT);
#ifdef OC_SECURITY #ifdef OC_SECURITY
} else if (FD_ISSET(dev->tcp.secure4_sock, fds)) { } else if (FD_ISSET(dev->tcp.secure4_sock, fds)) {
message->endpoint.flags = IPV4 | SECURED | TCP; message->endpoint.flags = IPV4 | SECURED | TCP;
if (accept_new_session(dev, dev->tcp.secure4_sock, fds, if (accept_new_session(dev, dev->tcp.secure4_sock, fds,
&message->endpoint) < 0) { &message->endpoint) < 0) {
OC_ERR("accept new session fail"); OC_ERR("accept new session fail");
ret_with_code(TCP_STATUS_ERROR); ret_with_code(ADAPTER_STATUS_ERROR);
} }
ret_with_code(TCP_STATUS_ACCEPT); ret_with_code(ADAPTER_STATUS_ACCEPT);
#endif /* OC_SECURITY */ #endif /* OC_SECURITY */
#endif /* OC_IPV4 */ #endif /* OC_IPV4 */
} else if (FD_ISSET(dev->tcp.connect_pipe[0], fds)) { } else if (FD_ISSET(dev->tcp.connect_pipe[0], fds)) {
ssize_t len = read(dev->tcp.connect_pipe[0], message->data, OC_PDU_SIZE); ssize_t len = read(dev->tcp.connect_pipe[0], message->data, OC_PDU_SIZE);
if (len < 0) { if (len < 0) {
OC_ERR("read error! %d", errno); OC_ERR("read error! %d", errno);
ret_with_code(TCP_STATUS_ERROR); ret_with_code(ADAPTER_STATUS_ERROR);
} }
FD_CLR(dev->tcp.connect_pipe[0], fds); FD_CLR(dev->tcp.connect_pipe[0], fds);
ret_with_code(TCP_STATUS_NONE); ret_with_code(ADAPTER_STATUS_NONE);
} }
// find session. // find session.
tcp_session_t *session = get_ready_to_read_session(fds); tcp_session_t *session = get_ready_to_read_session(fds);
if (!session) { if (!session) {
OC_DBG("could not find TCP session socket in fd set"); OC_DBG("could not find TCP session socket in fd set");
ret_with_code(TCP_STATUS_NONE); ret_with_code(ADAPTER_STATUS_NONE);
} }
// receive message. // receive message.
...@@ -379,13 +379,13 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message) ...@@ -379,13 +379,13 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
free_tcp_session(session); free_tcp_session(session);
ret_with_code(TCP_STATUS_ERROR); ret_with_code(ADAPTER_STATUS_ERROR);
} else if (count == 0) { } else if (count == 0) {
OC_DBG("peer closed TCP session\n"); OC_DBG("peer closed TCP session\n");
free_tcp_session(session); free_tcp_session(session);
ret_with_code(TCP_STATUS_NONE); ret_with_code(ADAPTER_STATUS_NONE);
} }
OC_DBG("recv(): %d bytes.", count); OC_DBG("recv(): %d bytes.", count);
...@@ -399,7 +399,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message) ...@@ -399,7 +399,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
OC_ERR("total receive length(%ld) is bigger than max pdu size(%ld)", OC_ERR("total receive length(%ld) is bigger than max pdu size(%ld)",
total_length, (OC_MAX_APP_DATA_SIZE + COAP_MAX_HEADER_SIZE)); total_length, (OC_MAX_APP_DATA_SIZE + COAP_MAX_HEADER_SIZE));
OC_ERR("It may occur buffer overflow."); OC_ERR("It may occur buffer overflow.");
ret_with_code(TCP_STATUS_ERROR); ret_with_code(ADAPTER_STATUS_ERROR);
} }
OC_DBG("tcp packet total length : %ld bytes.", total_length); OC_DBG("tcp packet total length : %ld bytes.", total_length);
...@@ -410,7 +410,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message) ...@@ -410,7 +410,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
memcpy(&message->endpoint, &session->endpoint, sizeof(oc_endpoint_t)); memcpy(&message->endpoint, &session->endpoint, sizeof(oc_endpoint_t));
FD_CLR(session->sock, fds); FD_CLR(session->sock, fds);
ret = TCP_STATUS_RECEIVE; ret = ADAPTER_STATUS_RECEIVE;
oc_tcp_receive_message_done: oc_tcp_receive_message_done:
pthread_mutex_unlock(&dev->tcp.mutex); pthread_mutex_unlock(&dev->tcp.mutex);
......
...@@ -29,13 +29,6 @@ extern "C" ...@@ -29,13 +29,6 @@ extern "C"
{ {
#endif #endif
typedef enum {
TCP_STATUS_NONE = 0,
TCP_STATUS_ACCEPT,
TCP_STATUS_RECEIVE,
TCP_STATUS_ERROR
} tcp_receive_state_t;
int oc_tcp_connectivity_init(ip_context_t *dev); int oc_tcp_connectivity_init(ip_context_t *dev);
void oc_tcp_connectivity_shutdown(ip_context_t *dev); void oc_tcp_connectivity_shutdown(ip_context_t *dev);
...@@ -47,8 +40,8 @@ void oc_tcp_add_socks_to_fd_set(ip_context_t *dev); ...@@ -47,8 +40,8 @@ void oc_tcp_add_socks_to_fd_set(ip_context_t *dev);
void oc_tcp_set_session_fds(fd_set *fds); void oc_tcp_set_session_fds(fd_set *fds);
tcp_receive_state_t oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, adapter_receive_state_t oc_tcp_receive_message(ip_context_t *dev, fd_set *fds,
oc_message_t *message); oc_message_t *message);
void oc_tcp_end_session(ip_context_t *dev, oc_endpoint_t *endpoint); void oc_tcp_end_session(ip_context_t *dev, oc_endpoint_t *endpoint);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment