diff options
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r-- | net/tipc/link.c | 794 |
1 files changed, 220 insertions, 574 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index ad2c57f5868d..fb1485dc6736 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -82,15 +82,13 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf); static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, struct sk_buff **buf); static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); -static int tipc_link_iovec_long_xmit(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destnode); static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); +static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf); +static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf); /* * Simple link routines @@ -335,13 +333,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) { struct tipc_port *p_ptr; + struct tipc_sock *tsk; spin_lock_bh(&tipc_port_list_lock); p_ptr = tipc_port_lock(origport); if (p_ptr) { if (!list_empty(&p_ptr->wait_list)) goto exit; - p_ptr->congested = 1; + tsk = tipc_port_to_sock(p_ptr); + tsk->link_cong = 1; p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); l_ptr->stats.link_congs++; @@ -355,6 +355,7 @@ exit: void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) { struct tipc_port *p_ptr; + struct tipc_sock *tsk; struct tipc_port *temp_p_ptr; int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; @@ -370,10 +371,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) wait_list) { if (win <= 0) break; + tsk = tipc_port_to_sock(p_ptr); list_del_init(&p_ptr->wait_list); spin_lock_bh(p_ptr->lock); - p_ptr->congested = 0; - tipc_port_wakeup(p_ptr); + tsk->link_cong = 0; + tipc_sock_wakeup(tsk); win -= p_ptr->waiting_pkts; spin_unlock_bh(p_ptr->lock); } @@ -676,178 +678,142 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) } } -/* - * link_bundle_buf(): Append contents of a buffer to - * the tail of an existing one. +/* tipc_link_cong: determine return value and how to treat the + * sent buffer during link congestion. + * - For plain, errorless user data messages we keep the buffer and + * return -ELINKONG. + * - For all other messages we discard the buffer and return -EHOSTUNREACH + * - For TIPC internal messages we also reset the link */ -static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler, - struct sk_buff *buf) +static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) { - struct tipc_msg *bundler_msg = buf_msg(bundler); struct tipc_msg *msg = buf_msg(buf); - u32 size = msg_size(msg); - u32 bundle_size = msg_size(bundler_msg); - u32 to_pos = align(bundle_size); - u32 pad = to_pos - bundle_size; - - if (msg_user(bundler_msg) != MSG_BUNDLER) - return 0; - if (msg_type(bundler_msg) != OPEN_MSG) - return 0; - if (skb_tailroom(bundler) < (pad + size)) - return 0; - if (l_ptr->max_pkt < (to_pos + size)) - return 0; - - skb_put(bundler, pad + size); - skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); - msg_set_size(bundler_msg, to_pos + size); - msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); - kfree_skb(buf); - l_ptr->stats.sent_bundled++; - return 1; -} - -static void link_add_to_outqueue(struct tipc_link *l_ptr, - struct sk_buff *buf, - struct tipc_msg *msg) -{ - u32 ack = mod(l_ptr->next_in_no - 1); - u32 seqno = mod(l_ptr->next_out_no++); + uint psz = msg_size(msg); + uint imp = tipc_msg_tot_importance(msg); + u32 oport = msg_tot_origport(msg); - msg_set_word(msg, 2, ((ack << 16) | seqno)); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - buf->next = NULL; - if (l_ptr->first_out) { - l_ptr->last_out->next = buf; - l_ptr->last_out = buf; - } else - l_ptr->first_out = l_ptr->last_out = buf; - - l_ptr->out_queue_size++; - if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) - l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; -} - -static void link_add_chain_to_outqueue(struct tipc_link *l_ptr, - struct sk_buff *buf_chain, - u32 long_msgno) -{ - struct sk_buff *buf; - struct tipc_msg *msg; - - if (!l_ptr->next_out) - l_ptr->next_out = buf_chain; - while (buf_chain) { - buf = buf_chain; - buf_chain = buf_chain->next; - - msg = buf_msg(buf); - msg_set_long_msgno(msg, long_msgno); - link_add_to_outqueue(l_ptr, buf, msg); + if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { + if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) { + link_schedule_port(link, oport, psz); + return -ELINKCONG; + } + } else { + pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); + tipc_link_reset(link); } + kfree_skb_list(buf); + return -EHOSTUNREACH; } -/* - * tipc_link_xmit() is the 'full path' for messages, called from - * inside TIPC when the 'fast path' in tipc_send_xmit - * has failed, and from link_send() +/** + * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked + * @link: link to use + * @buf: chain of buffers containing message + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket + * user data messages) or -EHOSTUNREACH (all other messages/senders) + * Only the socket functions tipc_send_stream() and tipc_send_packet() need + * to act on the return value, since they may need to do more send attempts. */ -int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); - u32 size = msg_size(msg); - u32 dsz = msg_data_sz(msg); - u32 queue_size = l_ptr->out_queue_size; - u32 imp = tipc_msg_tot_importance(msg); - u32 queue_limit = l_ptr->queue_limit[imp]; - u32 max_packet = l_ptr->max_pkt; - - /* Match msg importance against queue limits: */ - if (unlikely(queue_size >= queue_limit)) { - if (imp <= TIPC_CRITICAL_IMPORTANCE) { - link_schedule_port(l_ptr, msg_origport(msg), size); - kfree_skb(buf); - return -ELINKCONG; - } - kfree_skb(buf); - if (imp > CONN_MANAGER) { - pr_warn("%s<%s>, send queue full", link_rst_msg, - l_ptr->name); - tipc_link_reset(l_ptr); - } - return dsz; + uint psz = msg_size(msg); + uint qsz = link->out_queue_size; + uint sndlim = link->queue_limit[0]; + uint imp = tipc_msg_tot_importance(msg); + uint mtu = link->max_pkt; + uint ack = mod(link->next_in_no - 1); + uint seqno = link->next_out_no; + uint bc_last_in = link->owner->bclink.last_in; + struct tipc_media_addr *addr = &link->media_addr; + struct sk_buff *next = buf->next; + + /* Match queue limits against msg importance: */ + if (unlikely(qsz >= link->queue_limit[imp])) + return tipc_link_cong(link, buf); + + /* Has valid packet limit been used ? */ + if (unlikely(psz > mtu)) { + kfree_skb_list(buf); + return -EMSGSIZE; } - /* Fragmentation needed ? */ - if (size > max_packet) - return tipc_link_frag_xmit(l_ptr, buf); - - /* Packet can be queued or sent. */ - if (likely(!link_congested(l_ptr))) { - link_add_to_outqueue(l_ptr, buf, msg); + /* Prepare each packet for sending, and add to outqueue: */ + while (buf) { + next = buf->next; + msg = buf_msg(buf); + msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); + msg_set_bcast_ack(msg, bc_last_in); + + if (!link->first_out) { + link->first_out = buf; + } else if (qsz < sndlim) { + link->last_out->next = buf; + } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { + link->stats.sent_bundled++; + buf = next; + next = buf->next; + continue; + } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { + link->stats.sent_bundled++; + link->stats.sent_bundles++; + link->last_out->next = buf; + if (!link->next_out) + link->next_out = buf; + } else { + link->last_out->next = buf; + if (!link->next_out) + link->next_out = buf; + } - tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); - l_ptr->unacked_window = 0; - return dsz; - } - /* Congestion: can message be bundled ? */ - if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && - (msg_user(msg) != MSG_FRAGMENTER)) { - - /* Try adding message to an existing bundle */ - if (l_ptr->next_out && - link_bundle_buf(l_ptr, l_ptr->last_out, buf)) - return dsz; - - /* Try creating a new bundle */ - if (size <= max_packet * 2 / 3) { - struct sk_buff *bundler = tipc_buf_acquire(max_packet); - struct tipc_msg bundler_hdr; - - if (bundler) { - tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, - INT_H_SIZE, l_ptr->addr); - skb_copy_to_linear_data(bundler, &bundler_hdr, - INT_H_SIZE); - skb_trim(bundler, INT_H_SIZE); - link_bundle_buf(l_ptr, bundler, buf); - buf = bundler; - msg = buf_msg(buf); - l_ptr->stats.sent_bundles++; - } + /* Send packet if possible: */ + if (likely(++qsz <= sndlim)) { + tipc_bearer_send(link->bearer_id, buf, addr); + link->next_out = next; + link->unacked_window = 0; } + seqno++; + link->last_out = buf; + buf = next; } - if (!l_ptr->next_out) - l_ptr->next_out = buf; - link_add_to_outqueue(l_ptr, buf, msg); - return dsz; + link->next_out_no = seqno; + link->out_queue_size = qsz; + return 0; } -/* - * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use - * has not been selected yet, and the the owner node is not locked - * Called by TIPC internal users, e.g. the name distributor +/** + * tipc_link_xmit() is the general link level function for message sending + * @buf: chain of buffers containing message + * @dsz: amount of user data to be sent + * @dnode: address of destination node + * @selector: a number used for deterministic link selection + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) +int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) { - struct tipc_link *l_ptr; - struct tipc_node *n_ptr; - int res = -ELINKCONG; + struct tipc_link *link = NULL; + struct tipc_node *node; + int rc = -EHOSTUNREACH; - n_ptr = tipc_node_find(dest); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[selector & 1]; - if (l_ptr) - res = __tipc_link_xmit(l_ptr, buf); - else - kfree_skb(buf); - tipc_node_unlock(n_ptr); - } else { - kfree_skb(buf); + node = tipc_node_find(dnode); + if (node) { + tipc_node_lock(node); + link = node->active_links[selector & 1]; + if (link) + rc = __tipc_link_xmit(link, buf); + tipc_node_unlock(node); } - return res; + + if (link) + return rc; + + if (likely(in_own_node(dnode))) + return tipc_sk_rcv(buf); + + kfree_skb_list(buf); + return rc; } /* @@ -858,7 +824,7 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) * * Called with node locked */ -static void tipc_link_sync_xmit(struct tipc_link *l) +static void tipc_link_sync_xmit(struct tipc_link *link) { struct sk_buff *buf; struct tipc_msg *msg; @@ -868,10 +834,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l) return; msg = buf_msg(buf); - tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); - msg_set_last_bcast(msg, l->owner->bclink.acked); - link_add_chain_to_outqueue(l, buf, 0); - tipc_link_push_queue(l); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); + msg_set_last_bcast(msg, link->owner->bclink.acked); + __tipc_link_xmit(link, buf); } /* @@ -892,293 +857,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) } /* - * tipc_link_names_xmit - send name table entries to new neighbor - * - * Send routine for bulk delivery of name table messages when contact - * with a new neighbor occurs. No link congestion checking is performed - * because name table messages *must* be delivered. The messages must be - * small enough not to require fragmentation. - * Called without any locks held. - */ -void tipc_link_names_xmit(struct list_head *message_list, u32 dest) -{ - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct sk_buff *buf; - struct sk_buff *temp_buf; - - if (list_empty(message_list)) - return; - - n_ptr = tipc_node_find(dest); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[0]; - if (l_ptr) { - /* convert circular list to linear list */ - ((struct sk_buff *)message_list->prev)->next = NULL; - link_add_chain_to_outqueue(l_ptr, - (struct sk_buff *)message_list->next, 0); - tipc_link_push_queue(l_ptr); - INIT_LIST_HEAD(message_list); - } - tipc_node_unlock(n_ptr); - } - - /* discard the messages if they couldn't be sent */ - list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) { - list_del((struct list_head *)buf); - kfree_skb(buf); - } -} - -/* - * tipc_link_xmit_fast: Entry for data messages where the - * destination link is known and the header is complete, - * inclusive total message length. Very time critical. - * Link is locked. Returns user data length. - */ -static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf, - u32 *used_max_pkt) -{ - struct tipc_msg *msg = buf_msg(buf); - int res = msg_data_sz(msg); - - if (likely(!link_congested(l_ptr))) { - if (likely(msg_size(msg) <= l_ptr->max_pkt)) { - link_add_to_outqueue(l_ptr, buf, msg); - tipc_bearer_send(l_ptr->bearer_id, buf, - &l_ptr->media_addr); - l_ptr->unacked_window = 0; - return res; - } - else - *used_max_pkt = l_ptr->max_pkt; - } - return __tipc_link_xmit(l_ptr, buf); /* All other cases */ -} - -/* - * tipc_link_iovec_xmit_fast: Entry for messages where the - * destination processor is known and the header is complete, - * except for total message length. - * Returns user data length or errno. - */ -int tipc_link_iovec_xmit_fast(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destaddr) -{ - struct tipc_msg *hdr = &sender->phdr; - struct tipc_link *l_ptr; - struct sk_buff *buf; - struct tipc_node *node; - int res; - u32 selector = msg_origport(hdr) & 1; - -again: - /* - * Try building message using port's max_pkt hint. - * (Must not hold any locks while building message.) - */ - res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf); - /* Exit if build request was invalid */ - if (unlikely(res < 0)) - return res; - - node = tipc_node_find(destaddr); - if (likely(node)) { - tipc_node_lock(node); - l_ptr = node->active_links[selector]; - if (likely(l_ptr)) { - if (likely(buf)) { - res = tipc_link_xmit_fast(l_ptr, buf, - &sender->max_pkt); -exit: - tipc_node_unlock(node); - return res; - } - - /* Exit if link (or bearer) is congested */ - if (link_congested(l_ptr)) { - res = link_schedule_port(l_ptr, - sender->ref, res); - goto exit; - } - - /* - * Message size exceeds max_pkt hint; update hint, - * then re-try fast path or fragment the message - */ - sender->max_pkt = l_ptr->max_pkt; - tipc_node_unlock(node); - - - if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) - goto again; - - return tipc_link_iovec_long_xmit(sender, msg_sect, - len, destaddr); - } - tipc_node_unlock(node); - } - - /* Couldn't find a link to the destination node */ - kfree_skb(buf); - tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE); - return -ENETUNREACH; -} - -/* - * tipc_link_iovec_long_xmit(): Entry for long messages where the - * destination node is known and the header is complete, - * inclusive total message length. - * Link and bearer congestion status have been checked to be ok, - * and are ignored if they change. - * - * Note that fragments do not use the full link MTU so that they won't have - * to undergo refragmentation if link changeover causes them to be sent - * over another link with an additional tunnel header added as prefix. - * (Refragmentation will still occur if the other link has a smaller MTU.) - * - * Returns user data length or errno. - */ -static int tipc_link_iovec_long_xmit(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destaddr) -{ - struct tipc_link *l_ptr; - struct tipc_node *node; - struct tipc_msg *hdr = &sender->phdr; - u32 dsz = len; - u32 max_pkt, fragm_sz, rest; - struct tipc_msg fragm_hdr; - struct sk_buff *buf, *buf_chain, *prev; - u32 fragm_crs, fragm_rest, hsz, sect_rest; - const unchar __user *sect_crs; - int curr_sect; - u32 fragm_no; - int res = 0; - -again: - fragm_no = 1; - max_pkt = sender->max_pkt - INT_H_SIZE; - /* leave room for tunnel header in case of link changeover */ - fragm_sz = max_pkt - INT_H_SIZE; - /* leave room for fragmentation header in each fragment */ - rest = dsz; - fragm_crs = 0; - fragm_rest = 0; - sect_rest = 0; - sect_crs = NULL; - curr_sect = -1; - - /* Prepare reusable fragment header */ - tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, - INT_H_SIZE, msg_destnode(hdr)); - msg_set_size(&fragm_hdr, max_pkt); - msg_set_fragm_no(&fragm_hdr, 1); - - /* Prepare header of first fragment */ - buf_chain = buf = tipc_buf_acquire(max_pkt); - if (!buf) - return -ENOMEM; - buf->next = NULL; - skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); - hsz = msg_hdr_sz(hdr); - skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz); - - /* Chop up message */ - fragm_crs = INT_H_SIZE + hsz; - fragm_rest = fragm_sz - hsz; - - do { /* For all sections */ - u32 sz; - - if (!sect_rest) { - sect_rest = msg_sect[++curr_sect].iov_len; - sect_crs = msg_sect[curr_sect].iov_base; - } - - if (sect_rest < fragm_rest) - sz = sect_rest; - else - sz = fragm_rest; - - if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { - res = -EFAULT; -error: - kfree_skb_list(buf_chain); - return res; - } - sect_crs += sz; - sect_rest -= sz; - fragm_crs += sz; - fragm_rest -= sz; - rest -= sz; - - if (!fragm_rest && rest) { - - /* Initiate new fragment: */ - if (rest <= fragm_sz) { - fragm_sz = rest; - msg_set_type(&fragm_hdr, LAST_FRAGMENT); - } else { - msg_set_type(&fragm_hdr, FRAGMENT); - } - msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); - msg_set_fragm_no(&fragm_hdr, ++fragm_no); - prev = buf; - buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE); - if (!buf) { - res = -ENOMEM; - goto error; - } - - buf->next = NULL; - prev->next = buf; - skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); - fragm_crs = INT_H_SIZE; - fragm_rest = fragm_sz; - } - } while (rest > 0); - - /* - * Now we have a buffer chain. Select a link and check - * that packet size is still OK - */ - node = tipc_node_find(destaddr); - if (likely(node)) { - tipc_node_lock(node); - l_ptr = node->active_links[sender->ref & 1]; - if (!l_ptr) { - tipc_node_unlock(node); - goto reject; - } - if (l_ptr->max_pkt < max_pkt) { - sender->max_pkt = l_ptr->max_pkt; - tipc_node_unlock(node); - kfree_skb_list(buf_chain); - goto again; - } - } else { -reject: - kfree_skb_list(buf_chain); - tipc_port_iovec_reject(sender, hdr, msg_sect, len, - TIPC_ERR_NO_NODE); - return -ENETUNREACH; - } - - /* Append chain of fragments to send queue & send them */ - l_ptr->long_msg_seq_no++; - link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); - l_ptr->stats.sent_fragments += fragm_no; - l_ptr->stats.sent_fragmented++; - tipc_link_push_queue(l_ptr); - tipc_node_unlock(node); - return dsz; -} - -/* * tipc_link_push_packet: Push one unsent packet to the media */ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) @@ -1238,7 +916,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); if (msg_user(msg) == MSG_BUNDLER) - msg_set_type(msg, CLOSED_MSG); + msg_set_type(msg, BUNDLE_CLOSED); l_ptr->next_out = buf->next; return 0; } @@ -1527,11 +1205,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(!list_empty(&l_ptr->waiting_ports))) tipc_link_wakeup_ports(l_ptr, 0); - if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { - l_ptr->stats.sent_acks++; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); - } - /* Process the incoming packet */ if (unlikely(!link_working_working(l_ptr))) { if (msg_user(msg) == LINK_PROTOCOL) { @@ -1565,57 +1238,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(l_ptr->oldest_deferred_in)) head = link_insert_deferred_queue(l_ptr, head); - /* Deliver packet/message to correct user: */ - if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) { - if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { - tipc_node_unlock(n_ptr); - continue; - } - msg = buf_msg(buf); - } else if (msg_user(msg) == MSG_FRAGMENTER) { - l_ptr->stats.recv_fragments++; - if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) { - l_ptr->stats.recv_fragmented++; - msg = buf_msg(buf); - } else { - if (!l_ptr->reasm_buf) - tipc_link_reset(l_ptr); - tipc_node_unlock(n_ptr); - continue; - } + if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { + l_ptr->stats.sent_acks++; + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } - switch (msg_user(msg)) { - case TIPC_LOW_IMPORTANCE: - case TIPC_MEDIUM_IMPORTANCE: - case TIPC_HIGH_IMPORTANCE: - case TIPC_CRITICAL_IMPORTANCE: + if (tipc_link_prepare_input(l_ptr, &buf)) { tipc_node_unlock(n_ptr); - tipc_sk_rcv(buf); continue; - case MSG_BUNDLER: - l_ptr->stats.recv_bundles++; - l_ptr->stats.recv_bundled += msg_msgcnt(msg); - tipc_node_unlock(n_ptr); - tipc_link_bundle_rcv(buf); - continue; - case NAME_DISTRIBUTOR: - n_ptr->bclink.recv_permitted = true; - tipc_node_unlock(n_ptr); - tipc_named_rcv(buf); - continue; - case CONN_MANAGER: - tipc_node_unlock(n_ptr); - tipc_port_proto_rcv(buf); - continue; - case BCAST_PROTOCOL: - tipc_link_sync_rcv(n_ptr, buf); - break; - default: - kfree_skb(buf); - break; } tipc_node_unlock(n_ptr); + msg = buf_msg(buf); + if (tipc_link_input(l_ptr, buf) != 0) + goto discard; continue; unlock_discard: tipc_node_unlock(n_ptr); @@ -1625,6 +1260,80 @@ discard: } /** + * tipc_link_prepare_input - process TIPC link messages + * + * returns nonzero if the message was consumed + * + * Node lock must be held + */ +static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf) +{ + struct tipc_node *n; + struct tipc_msg *msg; + int res = -EINVAL; + + n = l->owner; + msg = buf_msg(*buf); + switch (msg_user(msg)) { + case CHANGEOVER_PROTOCOL: + if (tipc_link_tunnel_rcv(n, buf)) + res = 0; + break; + case MSG_FRAGMENTER: + l->stats.recv_fragments++; + if (tipc_buf_append(&l->reasm_buf, buf)) { + l->stats.recv_fragmented++; + res = 0; + } else if (!l->reasm_buf) { + tipc_link_reset(l); + } + break; + case MSG_BUNDLER: + l->stats.recv_bundles++; + l->stats.recv_bundled += msg_msgcnt(msg); + res = 0; + break; + case NAME_DISTRIBUTOR: + n->bclink.recv_permitted = true; + res = 0; + break; + case BCAST_PROTOCOL: + tipc_link_sync_rcv(n, *buf); + break; + default: + res = 0; + } + return res; +} +/** + * tipc_link_input - Deliver message too higher layers + */ +static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + int res = 0; + + switch (msg_user(msg)) { + case TIPC_LOW_IMPORTANCE: + case TIPC_MEDIUM_IMPORTANCE: + case TIPC_HIGH_IMPORTANCE: + case TIPC_CRITICAL_IMPORTANCE: + case CONN_MANAGER: + tipc_sk_rcv(buf); + break; + case NAME_DISTRIBUTOR: + tipc_named_rcv(buf); + break; + case MSG_BUNDLER: + tipc_link_bundle_rcv(buf); + break; + default: + res = -EINVAL; + } + return res; +} + +/** * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue * * Returns increase in queue length (i.e. 0 or 1) @@ -2217,6 +1926,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf) u32 msgcount = msg_msgcnt(buf_msg(buf)); u32 pos = INT_H_SIZE; struct sk_buff *obuf; + struct tipc_msg *omsg; while (msgcount--) { obuf = buf_extract(buf, pos); @@ -2224,82 +1934,18 @@ void tipc_link_bundle_rcv(struct sk_buff *buf) pr_warn("Link unable to unbundle message(s)\n"); break; } - pos += align(msg_size(buf_msg(obuf))); - tipc_net_route_msg(obuf); - } - kfree_skb(buf); -} - -/* - * Fragmentation/defragmentation: - */ - -/* - * tipc_link_frag_xmit: Entry for buffers needing fragmentation. - * The buffer is complete, inclusive total message length. - * Returns user data length. - */ -static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) -{ - struct sk_buff *buf_chain = NULL; - struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain; - struct tipc_msg *inmsg = buf_msg(buf); - struct tipc_msg fragm_hdr; - u32 insize = msg_size(inmsg); - u32 dsz = msg_data_sz(inmsg); - unchar *crs = buf->data; - u32 rest = insize; - u32 pack_sz = l_ptr->max_pkt; - u32 fragm_sz = pack_sz - INT_H_SIZE; - u32 fragm_no = 0; - u32 destaddr; - - if (msg_short(inmsg)) - destaddr = l_ptr->addr; - else - destaddr = msg_destnode(inmsg); - - /* Prepare reusable fragment header: */ - tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, - INT_H_SIZE, destaddr); - - /* Chop up message: */ - while (rest > 0) { - struct sk_buff *fragm; - - if (rest <= fragm_sz) { - fragm_sz = rest; - msg_set_type(&fragm_hdr, LAST_FRAGMENT); - } - fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); - if (fragm == NULL) { - kfree_skb(buf); - kfree_skb_list(buf_chain); - return -ENOMEM; + omsg = buf_msg(obuf); + pos += align(msg_size(omsg)); + if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) { + tipc_sk_rcv(obuf); + } else if (msg_user(omsg) == NAME_DISTRIBUTOR) { + tipc_named_rcv(obuf); + } else { + pr_warn("Illegal bundled msg: %u\n", msg_user(omsg)); + kfree_skb(obuf); } - msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); - fragm_no++; - msg_set_fragm_no(&fragm_hdr, fragm_no); - skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, - fragm_sz); - buf_chain_tail->next = fragm; - buf_chain_tail = fragm; - - rest -= fragm_sz; - crs += fragm_sz; - msg_set_type(&fragm_hdr, FRAGMENT); } kfree_skb(buf); - - /* Append chain of fragments to send queue & send them */ - l_ptr->long_msg_seq_no++; - link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); - l_ptr->stats.sent_fragments += fragm_no; - l_ptr->stats.sent_fragmented++; - tipc_link_push_queue(l_ptr); - - return dsz; } static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) |