Motr  M0
node_bulk.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/arith.h" /* M0_SWAP */
24 #include "lib/errno.h" /* EALREADY */
25 #include "lib/memory.h" /* M0_ALLOC_PTR */
26 #include "lib/misc.h" /* M0_IN */
27 #include "lib/atomic.h" /* m0_atomic64 */
28 
29 #include "motr/magic.h" /* M0_NET_TEST_BSB_MAGIC */
30 
31 #include "net/test/network.h" /* m0_net_test_network_ctx */
32 #include "net/test/node.h" /* m0_net_test_node_ctx */
33 #include "net/test/node_helper.h" /* m0_net_test_node_ctx */
34 #include "net/test/service.h" /* m0_net_test_service */
35 #include "net/test/ringbuf.h" /* m0_net_test_ringbuf */
36 
37 #include "net/test/node_bulk.h"
38 
39 #define NET_TEST_MODULE_NAME node_bulk
40 #include "net/test/debug.h" /* LOGD */
41 
90  TS_UNUSED = 0,
103 };
104 
111  int bse_cb_rc;
112 };
113 
117  uint64_t bsb_magic;
124  size_t bsb_index;
148 };
149 
153  uint64_t bsp_magic;
155  size_t bsp_index;
164 };
165 
169  uint64_t ssb_magic;
171  size_t ssb_index;
176 };
177 
187 };
188 
189 M0_TL_DESCR_DEFINE(bsb, "buf_status_bulk", static,
190  struct buf_status_bulk, bsb_tlink, bsb_magic,
192 M0_TL_DEFINE(bsb, static, struct buf_status_bulk);
193 
194 M0_TL_DESCR_DEFINE(ssb, "server_status_bulk", static,
195  struct server_status_bulk, ssb_tlink, ssb_magic,
197 M0_TL_DEFINE(ssb, static, struct server_status_bulk);
198 
199 M0_TL_DESCR_DEFINE(bsp, "buf_status_ping", static,
200  struct buf_status_ping, bsp_tlink, bsp_magic,
202 M0_TL_DEFINE(bsp, static, struct buf_status_ping);
203 
237  size_t nbc_bs_nr;
283 };
284 
286 static void sd_update(struct node_bulk_ctx *ctx,
288  enum m0_net_test_nh_msg_status status,
289  enum m0_net_test_nh_msg_direction direction)
290 {
291  m0_net_test_nh_sd_update(&ctx->nbc_nh, type, status, direction);
292 }
293 
294 static void node_bulk_tm_event_cb(const struct m0_net_tm_event *ev)
295 {
296  /* nothing for now */
297 }
298 
299 static const struct m0_net_tm_callbacks node_bulk_tm_cb = {
301 };
302 
303 static struct node_bulk_ctx *
305 {
306  return container_of(net_ctx, struct node_bulk_ctx, nbc_net);
307 }
308 
309 static bool
311  enum transfer_state to,
312  const struct state_transition allowed[],
313  size_t allowed_size)
314 {
315  size_t i;
316 
317  M0_PRE(allowed != NULL);
318  M0_PRE(allowed_size > 0);
319 
320  for (i = 0; i < allowed_size; ++i) {
321  if (allowed[i].sta_from == from && allowed[i].sta_to == to)
322  break;
323  }
324  return i < allowed_size;
325 }
326 
328 {
329  return M0_IN(state, (TS_FAILED, TS_TRANSFERRED, TS_BADMSG));
330 }
331 
333  size_t bs_index,
334  enum transfer_state state)
335 {
336  static const struct state_transition allowed_client[] = {
337  { .sta_from = TS_UNUSED, .sta_to = TS_QUEUED },
338  { .sta_from = TS_QUEUED, .sta_to = TS_BD_SENT },
339  { .sta_from = TS_QUEUED, .sta_to = TS_FAILED },
340  { .sta_from = TS_QUEUED, .sta_to = TS_FAILED1 },
341  { .sta_from = TS_QUEUED, .sta_to = TS_FAILED2 },
342  { .sta_from = TS_BD_SENT, .sta_to = TS_CB_LEFT2 },
343  { .sta_from = TS_BD_SENT, .sta_to = TS_FAILED2 },
344  { .sta_from = TS_CB_LEFT2, .sta_to = TS_CB_LEFT1 },
345  { .sta_from = TS_CB_LEFT2, .sta_to = TS_FAILED1 },
346  { .sta_from = TS_CB_LEFT1, .sta_to = TS_TRANSFERRED },
347  { .sta_from = TS_CB_LEFT1, .sta_to = TS_FAILED },
348  { .sta_from = TS_FAILED2, .sta_to = TS_FAILED1 },
349  { .sta_from = TS_FAILED1, .sta_to = TS_FAILED },
350  { .sta_from = TS_TRANSFERRED, .sta_to = TS_UNUSED },
351  { .sta_from = TS_FAILED, .sta_to = TS_UNUSED },
352  };
353  static const struct state_transition allowed_server[] = {
354  { .sta_from = TS_UNUSED, .sta_to = TS_BD_RECEIVED },
355  { .sta_from = TS_BD_RECEIVED, .sta_to = TS_BADMSG },
356  { .sta_from = TS_BD_RECEIVED, .sta_to = TS_RECEIVING },
357  { .sta_from = TS_RECEIVING, .sta_to = TS_SENDING },
358  { .sta_from = TS_RECEIVING, .sta_to = TS_FAILED },
359  { .sta_from = TS_SENDING, .sta_to = TS_TRANSFERRED },
360  { .sta_from = TS_SENDING, .sta_to = TS_FAILED },
361  { .sta_from = TS_TRANSFERRED, .sta_to = TS_UNUSED },
362  { .sta_from = TS_FAILED, .sta_to = TS_UNUSED },
363  { .sta_from = TS_BADMSG, .sta_to = TS_UNUSED },
364  };
365  enum m0_net_test_role role;
366  struct buf_status_bulk *bs;
367  bool can_change;
368  bool role_client;
369 
370  M0_PRE(ctx != NULL);
371  M0_PRE(bs_index < ctx->nbc_bs_nr);
372  M0_PRE(ctx->nbc_bs != NULL);
373 
374  LOGD("state_change: role = %d, bs_index = %lu, state = %d",
375  ctx->nbc_nh.ntnh_role, bs_index, state);
376 
377  role = ctx->nbc_nh.ntnh_role;
378  bs = &ctx->nbc_bs[bs_index];
379  role_client = role == M0_NET_TEST_ROLE_CLIENT;
380  can_change = node_bulk_state_change_allowed(bs->bsb_ts, state,
381  role_client ? allowed_client : allowed_server,
382  role_client ? ARRAY_SIZE(allowed_client) :
383  ARRAY_SIZE(allowed_server));
384  M0_ASSERT(can_change);
385  bs->bsb_ts = state;
386 
387  /* add to ringbufs if needed */
388  if (state == TS_UNUSED)
389  m0_net_test_ringbuf_push(&ctx->nbc_rb_bulk_unused, bs_index);
390  if (node_bulk_state_is_final(state))
391  m0_net_test_ringbuf_push(&ctx->nbc_rb_bulk_final, bs_index);
392  /* set start & finish timestamp */
393  if (M0_IN(state, (TS_RECEIVING, TS_QUEUED)))
394  bs->bsb_time_start = m0_time_now();
395  if (state == TS_TRANSFERRED)
397  /* reset buf_status_errno if needed */
398  if (state == TS_UNUSED) {
399  M0_SET0(&bs->bsb_msg);
400  M0_SET0(&bs->bsb_send);
401  M0_SET0(&bs->bsb_recv);
402  }
403 }
404 
406  { .sta_from = TS_BD_SENT, .sta_to = TS_CB_LEFT2 },
407  { .sta_from = TS_CB_LEFT2, .sta_to = TS_CB_LEFT1 },
408  { .sta_from = TS_CB_LEFT1, .sta_to = TS_TRANSFERRED },
409  { .sta_from = TS_FAILED2, .sta_to = TS_FAILED1 },
410  { .sta_from = TS_FAILED1, .sta_to = TS_FAILED },
411 };
413  { .sta_from = TS_BD_SENT, .sta_to = TS_FAILED2 },
414  { .sta_from = TS_CB_LEFT2, .sta_to = TS_FAILED1 },
415  { .sta_from = TS_CB_LEFT1, .sta_to = TS_FAILED },
416  { .sta_from = TS_FAILED2, .sta_to = TS_FAILED1 },
417  { .sta_from = TS_FAILED1, .sta_to = TS_FAILED },
418 };
420  { .sta_from = TS_RECEIVING, .sta_to = TS_SENDING },
421  { .sta_from = TS_SENDING, .sta_to = TS_TRANSFERRED },
422 };
424  { .sta_from = TS_RECEIVING, .sta_to = TS_FAILED },
425  { .sta_from = TS_SENDING, .sta_to = TS_FAILED },
426 };
427 
428 static const struct {
430  const size_t nbst_nr;
432 #define TRANSITION(name) { \
433  .nbst_transition = name, \
434  .nbst_nr = ARRAY_SIZE(name), \
435 }
440 #undef TRANSITION
441 };
442 
444 static void node_bulk_state_check(const struct state_transition state_list[],
445  size_t state_nr)
446 {
447  size_t i;
448  size_t j;
449 
450  for (i = 0; i < state_nr; ++i) {
451  for (j = i + 1; j < state_nr; ++j) {
452  M0_ASSERT(state_list[i].sta_from !=
453  state_list[j].sta_from);
454  }
455  }
456 }
457 
459 static void node_bulk_state_check_all(void)
460 {
461  size_t i;
462 
463  for (i = 0; i < ARRAY_SIZE(node_bulk_state_transitions); ++i) {
467  }
468 }
469 
470 static enum transfer_state
472  const struct state_transition state_list[],
473  size_t state_nr)
474 {
475  size_t i;
476 
477  for (i = 0; i < state_nr; ++i) {
478  if (state_list[i].sta_from == state)
479  return state_list[i].sta_to;
480  }
481  M0_IMPOSSIBLE("Invalid 'from' state in net-test bulk testing.");
482  return TS_UNUSED;
483 }
484 
486  size_t bs_index,
487  bool success)
488 {
489  const struct state_transition *transition;
490  size_t transition_size;
491  enum transfer_state state;
492 
493  M0_PRE(ctx != NULL);
494  M0_PRE(bs_index < ctx->nbc_bs_nr);
495  M0_PRE(ctx->nbc_bs != NULL);
496 
497  if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
498  transition = success ?
501  transition_size = success ?
504  } else if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_SERVER) {
505  transition = success ?
508  transition_size = success ?
511  } else {
512  transition = NULL;
513  transition_size = 0;
514  M0_IMPOSSIBLE("Invalid node role in net-test bulk testing");
515  }
516  state = node_bulk_state_search(ctx->nbc_bs[bs_index].bsb_ts,
517  transition, transition_size);
518  node_bulk_state_change(ctx, bs_index, state);
519 }
520 
522  size_t bs_index)
523 {
524  struct buf_status_bulk *bs;
525  bool role_client;
526  m0_time_t rtt;
527 
528  M0_PRE(ctx != NULL);
529  M0_PRE(bs_index < ctx->nbc_bs_nr);
530 
531  role_client = ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT;
532  bs = &ctx->nbc_bs[bs_index];
533  /* Check final states */
534  M0_ASSERT(M0_IN(bs->bsb_ts, (TS_TRANSFERRED, TS_FAILED)) ||
535  (!role_client && bs->bsb_ts == TS_BADMSG));
536  switch (bs->bsb_ts) {
537  case TS_TRANSFERRED:
540  m0_net_test_nh_sd_update_rtt(&ctx->nbc_nh, rtt);
541  break;
542  case TS_FAILED:
544  break;
545  case TS_BADMSG:
547  break;
548  default:
549  M0_IMPOSSIBLE("Impossible final state in "
550  "net-test bulk testing");
551  }
553 }
554 
556 {
557  size_t bs_index;
558  size_t i;
559  size_t nr;
560 
561  M0_PRE(ctx != NULL);
562 
563  nr = m0_net_test_ringbuf_nr(&ctx->nbc_rb_bulk_final);
564  for (i = 0; i < nr; ++i) {
565  bs_index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_bulk_final);
567  }
568  M0_POST(m0_net_test_ringbuf_is_empty(&ctx->nbc_rb_bulk_final));
569 }
570 
572 {
573  size_t index;
574  size_t i;
575  size_t nr;
576  int rc;
577 
578  M0_PRE(ctx != NULL);
579 
580  nr = m0_net_test_ringbuf_nr(&ctx->nbc_rb_ping_unused);
581  for (i = 0; i < nr; ++i) {
582  index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_ping_unused);
584  if (rc != 0) {
586  m0_net_test_ringbuf_push(&ctx->nbc_rb_ping_unused,
587  index);
588  }
589  }
590 }
591 
593  size_t buf_bulk_index)
594 {
596  buf_bulk_index);
597 }
598 
599 static void buf_desc_set0(struct node_bulk_ctx *ctx,
600  size_t buf_bulk_index)
601 {
602  M0_PRE(ctx != NULL);
603  M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
604 
605  M0_SET0(&net_buf_bulk_get(ctx, buf_bulk_index)->nb_desc);
606  M0_SET0(&ctx->nbc_bs[buf_bulk_index].bsb_desc_send);
607 }
608 
613 static void buf_desc_swap(struct node_bulk_ctx *ctx,
614  size_t buf_bulk_index)
615 {
616  M0_PRE(ctx != NULL);
617  M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
618 
619  M0_SWAP(net_buf_bulk_get(ctx, buf_bulk_index)->nb_desc,
620  ctx->nbc_bs[buf_bulk_index].bsb_desc_send);
621 }
622 
624  size_t buf_bulk_index)
625 {
626  M0_PRE(ctx != NULL);
627  M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
628 
629  m0_net_desc_free(&net_buf_bulk_get(ctx, buf_bulk_index)->nb_desc);
630  m0_net_desc_free(&ctx->nbc_bs[buf_bulk_index].bsb_desc_send);
631 }
632 
634  size_t bs_index)
635 {
636  M0_PRE(ctx != NULL);
637  M0_PRE(bs_index * 2 + 1 < ctx->nbc_buf_bulk_nr);
638 
639  m0_net_desc_free(&net_buf_bulk_get(ctx, bs_index * 2)->nb_desc);
640  m0_net_desc_free(&net_buf_bulk_get(ctx, bs_index * 2 + 1)->nb_desc);
641 }
642 
644  size_t buf_bulk_index,
645  size_t buf_ping_index,
647 {
648  m0_bcount_t len;
649  m0_bcount_t len_total;
650 
651  M0_PRE(ctx != NULL);
652  M0_PRE(buf_ping_index < ctx->nbc_buf_ping_nr);
653  M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
654 
655  buf_desc_set0(ctx, buf_bulk_index);
656  /* decode network buffer descriptors for active bulk receiving */
658  &ctx->nbc_net, buf_bulk_index,
659  buf_ping_index, offset);
660  if (len == 0)
661  return 0;
662  len_total = net_test_len_accumulate(0, len);
663 
664  /*
665  * buf->nb_desc = zero descriptor
666  * bs->bsb_desc_send = descriptor for active bulk receiving
667  */
668  buf_desc_swap(ctx, buf_bulk_index);
669 
671  &ctx->nbc_net, buf_bulk_index,
672  buf_ping_index,
673  offset + len_total);
674  if (len == 0) {
675  /* free already allocated network descriptor */
676  buf_desc_server_free(ctx, buf_bulk_index);
677  return 0;
678  }
679  len_total = net_test_len_accumulate(len_total, len);
680 
681  /*
682  * buf->nb_desc = descriptor for active bulk receiving
683  * bs->bsb_desc_send = descriptor for active bulk sending
684  */
685  buf_desc_swap(ctx, buf_bulk_index);
686 
687  return len_total;
688 }
689 
697  size_t buf_ping_index,
699 {
700  m0_bcount_t len;
701  size_t buf_bulk_index;
702  bool no_unused_buf;
703  int rc;
704 
705  no_unused_buf = m0_net_test_ringbuf_is_empty(&ctx->nbc_rb_bulk_unused);
706  if (no_unused_buf) {
707  LOGD("--- NO UNUSED BUFS");
709  return 0;
710  }
711 
712  /* get unused buf */
713  buf_bulk_index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_bulk_unused);
714  M0_ASSERT(buf_bulk_index < ctx->nbc_buf_bulk_nr);
715  node_bulk_state_change(ctx, buf_bulk_index, TS_BD_RECEIVED);
716  /* deserialize network buffer descriptors */
717  len = buf_desc_deserialize(ctx, buf_bulk_index, buf_ping_index, offset);
718  if (len == 0) {
719  LOGD("BADMSG: buf_bulk_index = %lu, "
720  "buf_ping_index = %lu, offset = %lu",
721  buf_bulk_index, buf_ping_index, (unsigned long) offset);
722  /* ping buffer contains invalid data */
723  node_bulk_state_change(ctx, buf_bulk_index, TS_BADMSG);
724  return 0;
725  }
726  node_bulk_state_change(ctx, buf_bulk_index, TS_RECEIVING);
727  /* start active bulk receiving */
728  rc = m0_net_test_network_bulk_enqueue(&ctx->nbc_net, buf_bulk_index, 0,
730  ctx->nbc_bs[buf_bulk_index].bsb_recv.bse_func_rc = rc;
731  if (rc != 0) {
732  /*
733  * Addition buffer to network queue failed.
734  * Free allocated (when deserialized) network descriptors.
735  */
736  node_bulk_state_change(ctx, buf_bulk_index, TS_FAILED);
737  buf_desc_server_free(ctx, buf_bulk_index);
739  }
740  return rc == 0 ? len : 0;
741 }
742 
744  size_t buf_index,
745  enum m0_net_queue_type q,
746  const struct m0_net_buffer_event *ev)
747 {
749  m0_bcount_t len;
750  size_t nr;
751  size_t i;
752  int rc;
753 
754  M0_PRE(ctx != NULL);
755  M0_PRE(ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_SERVER);
756  M0_PRE(ergo(q == M0_NET_QT_MSG_RECV, buf_index < ctx->nbc_buf_ping_nr));
759  buf_index < ctx->nbc_buf_bulk_nr));
760 
761  if (q == M0_NET_QT_MSG_RECV) {
762  if (ev->nbe_status != 0)
763  return;
764  nr = m0_net_test_network_bd_nr(&ctx->nbc_net, buf_index);
765  if (nr % 2 != 0) {
766  LOGD("MS_BAD: nr = %lu", nr);
768  return;
769  }
770  nr /= 2;
771  offset = 0;
772  for (i = 0; i < nr; ++i) {
773  len = node_bulk_server_transfer_start(ctx, buf_index,
774  offset);
775  offset += len;
776  }
777  } else if (q == M0_NET_QT_ACTIVE_BULK_RECV) {
778  if (ev->nbe_status != 0) {
779  LOGD("--- active bulk recv FAILED!");
780  buf_desc_server_free(ctx, buf_index);
781  return;
782  }
783  /*
784  * Don't free m0_net_buf_desc here to avoid
785  * memory allocator delays.
786  */
787  buf_desc_swap(ctx, buf_index);
789  buf_index, 0,
791  ctx->nbc_bs[buf_index].bsb_send.bse_func_rc = rc;
792  if (rc != 0) {
793  LOGD("--- active bulk send FAILED!");
794  buf_desc_server_free(ctx, buf_index);
795  node_bulk_state_change(ctx, buf_index, TS_FAILED);
797  }
798  } else if (q == M0_NET_QT_ACTIVE_BULK_SEND) {
799  buf_desc_server_free(ctx, buf_index);
800  }
801 }
802 
804  size_t buf_index,
805  enum m0_net_queue_type q,
806  const struct m0_net_buffer_event *ev)
807 {
808  struct buf_status_bulk *bs;
809 
810  M0_PRE(ctx != NULL);
811  M0_PRE(ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT);
812  M0_PRE(ergo(q == M0_NET_QT_MSG_SEND, buf_index < ctx->nbc_buf_ping_nr));
815  buf_index < ctx->nbc_buf_bulk_nr));
816 
817  if (q == M0_NET_QT_MSG_SEND) {
818  /*
819  * Change state for every bulk buffer, which
820  * descriptor is stored in current message.
821  */
822  m0_tl_teardown(bsb, &ctx->nbc_bsp[buf_index].bsp_buffers, bs) {
823  bs->bsb_msg.bse_cb_rc = ev->nbe_status;
825  ev->nbe_status == 0);
826  }
827  } else if (M0_IN(q, (M0_NET_QT_PASSIVE_BULK_RECV,
829  bs = &ctx->nbc_bs[buf_index / 2];
831  buf_desc_client_free(ctx, buf_index / 2);
832  }
833 }
834 
836 {
837  return m0_atomic64_get(&ctx->nbc_stop_flag) == 1;
838 }
839 
840 static void node_bulk_cb(struct m0_net_test_network_ctx *net_ctx,
841  const uint32_t buf_index,
842  enum m0_net_queue_type q,
843  const struct m0_net_buffer_event *ev)
844 {
845  struct buf_status_bulk *bs;
846  size_t bs_index;
847  struct buf_status_errno *bs_e;
848  struct node_bulk_ctx *ctx = node_bulk_ctx_from_net_ctx(net_ctx);
849  bool role_client;
850  bool buf_send;
851  bool buf_bulk;
852 
853  LOGD("node_bulk_cb: tm_addr = %s, buf_index = %u, q = %d"
854  ", ev-nbe_status = %d",
855  net_ctx->ntc_tm->ntm_ep->nep_addr, buf_index, q, ev->nbe_status);
856  M0_PRE(net_ctx != NULL);
857  role_client = ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT;
858  M0_PRE(ergo(q == M0_NET_QT_MSG_RECV, !role_client));
859  M0_PRE(ergo(q == M0_NET_QT_MSG_SEND, role_client));
860  M0_PRE(ergo(q == M0_NET_QT_PASSIVE_BULK_RECV, role_client));
861  M0_PRE(ergo(q == M0_NET_QT_PASSIVE_BULK_SEND, role_client));
862  M0_PRE(ergo(q == M0_NET_QT_ACTIVE_BULK_RECV, !role_client));
863  M0_PRE(ergo(q == M0_NET_QT_ACTIVE_BULK_SEND, !role_client));
865  buf_index < ctx->nbc_buf_ping_nr));
866 
867  if (ev->nbe_status != 0 && ev->nbe_status != -ECANCELED) {
868  LOGD("--CALLBACK ERROR! errno = %d", ev->nbe_status);
869  LOGD("node_bulk_cb: tm_addr = %s, buf_index = %u, q = %d"
870  ", ev-nbe_status = %d",
871  net_ctx->ntc_tm->ntm_ep->nep_addr, buf_index, q,
872  ev->nbe_status);
873  }
874 
875  ctx->nbc_callback_executed = true;
876  buf_bulk = false;
877  if (M0_IN(q,
880  buf_bulk = true;
881  bs_index = role_client ? buf_index / 2 : buf_index;
882  M0_ASSERT(bs_index < ctx->nbc_bs_nr);
883  bs = &ctx->nbc_bs[bs_index];
884  bs_e = q == M0_NET_QT_PASSIVE_BULK_RECV ||
888  M0_ASSERT(bs_e != NULL);
889  bs_e->bse_cb_rc = ev->nbe_status;
890  node_bulk_state_change_cb(ctx, bs_index, ev->nbe_status == 0);
891  }
892  (role_client ? node_bulk_cb_client : node_bulk_cb_server)
893  (ctx, buf_index, q, ev);
894  /* Synchronise with node_bulk_worker(). */
895  m0_mutex_lock(&ctx->nbc_bulk_mutex);
896  if (M0_IN(q, (M0_NET_QT_MSG_SEND, M0_NET_QT_MSG_RECV))) {
897  /* ping buffer can be reused now */
898  m0_net_test_ringbuf_push(&ctx->nbc_rb_ping_unused, buf_index);
899  }
900  if (!role_client && q == M0_NET_QT_MSG_RECV) {
903  }
904  /* update stats */
905  buf_send = q == M0_NET_QT_PASSIVE_BULK_SEND ||
907  sd_update(ctx, buf_bulk ? MT_BULK : MT_MSG,
908  ev->nbe_status == 0 ? MS_SUCCESS : MS_FAILED,
909  buf_send ? MD_SEND : MD_RECV);
910  /* state transitions from final states */
912  m0_mutex_unlock(&ctx->nbc_bulk_mutex);
913 }
914 
916  .ntnbc_cb = {
923  }
924 };
925 
927 static size_t client_server_index(struct node_bulk_ctx *ctx, size_t bs_index)
928 {
929  M0_PRE(ctx != NULL);
930  M0_PRE(ctx->nbc_client_concurrency > 0);
931 
932  return bs_index / ctx->nbc_client_concurrency;
933 }
934 
937  size_t buf_index)
938 {
939  enum m0_net_queue_type q;
940  size_t server_index;
941 
942  q = buf_index % 2 == 0 ? M0_NET_QT_PASSIVE_BULK_SEND :
944  server_index = client_server_index(ctx, buf_index / 2);
945  return m0_net_test_network_bulk_enqueue(&ctx->nbc_net, buf_index,
946  server_index, q);
947 }
948 
950  size_t buf_index)
951 {
953  buf_index);
954 }
955 
957  size_t bs_index)
958 {
959  struct buf_status_bulk *bs;
960  int rc;
961 
962  M0_PRE(ctx != NULL);
963  M0_PRE(bs_index < ctx->nbc_bs_nr);
964 
965  bs = &ctx->nbc_bs[bs_index];
967  rc = client_bulk_enqueue(ctx, bs_index * 2);
968  bs->bsb_send.bse_func_rc = rc;
969  if (rc != 0) {
972  return;
973  }
974  rc = client_bulk_enqueue(ctx, bs_index * 2 + 1);
975  bs->bsb_recv.bse_func_rc = rc;
976  if (rc != 0) {
978  client_bulk_dequeue(ctx, bs_index * 2);
980  return;
981  }
982  m0_net_test_ringbuf_push(&ctx->nbc_rb_bulk_queued, bs_index);
983 }
984 
986 {
987  size_t i;
988  size_t nr;
989  size_t bs_index;
990  bool transfer_next;
991 
992  M0_PRE(ctx != NULL);
993 
994  nr = m0_net_test_ringbuf_nr(&ctx->nbc_rb_bulk_unused);
995  for (i = 0; i < nr; ++i) {
996  /* Check stop conditions */
997  transfer_next = m0_net_test_nh_transfer_next(&ctx->nbc_nh);
998  /*
999  LOGD("client: transfer_next = %s",
1000  transfer_next ? (char *) "true" : (char *) "false");
1001  */
1002  if (!transfer_next)
1003  break;
1004  /* Start next transfer */
1005  bs_index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_bulk_unused);
1006  /*
1007  LOGD("client: next transfer bs_index = %lu",
1008  bs_index);
1009  */
1010  client_transfer_start(ctx, bs_index);
1011  }
1012 }
1013 
1015  size_t bsb_index,
1016  size_t msg_buf_index,
1018 {
1019  m0_bcount_t len;
1020  m0_bcount_t len_total;
1021 
1023  net, bsb_index * 2,
1024  msg_buf_index, offset);
1025  if (len == 0)
1026  return 0;
1027  len_total = net_test_len_accumulate(0, len);
1028 
1030  net, bsb_index * 2 + 1,
1031  msg_buf_index,
1032  offset + len_total);
1033  if (len == 0) {
1034  /*
1035  * If first buffer descriptor serializing succeed,
1036  * then number of serialized network buffer descriptors
1037  * is increased. But if second serializing fails, then
1038  * number of network buffer descriptors inside ping
1039  * buffer should be decreased because these two
1040  * descriptors should be added both or should not be
1041  * added at all.
1042  */
1043  m0_net_test_network_bd_nr_dec(net, msg_buf_index);
1044  }
1045  len_total = net_test_len_accumulate(len_total, len);
1046  return len_total;
1047 }
1048 
1049 
1051  struct buf_status_bulk *bs)
1052 {
1053  client_bulk_dequeue(ctx, bs->bsb_index * 2);
1054  client_bulk_dequeue(ctx, bs->bsb_index * 2 + 1);
1055 }
1056 
1057 static void client_bds_send(struct node_bulk_ctx *ctx,
1058  struct server_status_bulk *ss)
1059 {
1060  struct m0_net_test_ringbuf *rb_ping = &ctx->nbc_rb_ping_unused;
1061  struct buf_status_bulk *bs;
1062  /* Message buffer was taken from unused list */
1063  bool msg_taken;
1064  /* Message buffer index, makes sense iff (msg_taken) */
1065  size_t msg_index = 0;
1066  /* Number of buffer descriptors in selected message buffer */
1067  size_t msg_bd_nr = 0;
1068  struct buf_status_ping *msg_bs = 0;
1069  m0_bcount_t msg_offset = 0;
1070  m0_bcount_t len;
1071  bool buf_last;
1072  int rc;
1073  struct m0_tl messages;
1074  bool list_is_empty;
1075 
1076  M0_PRE(ctx != NULL);
1077  M0_PRE(ss != NULL);
1078  M0_PRE(ctx->nbc_bd_nr_max > 0 && ctx->nbc_bd_nr_max % 2 == 0);
1079 
1080  bsp_tlist_init(&messages);
1081  msg_taken = false;
1082  m0_tl_for(bsb, &ss->ssb_buffers, bs) {
1083 take_msg:
1084  if (!msg_taken && m0_net_test_ringbuf_is_empty(rb_ping)) {
1085  /*
1086  * No free message to transfer bulk buffer
1087  * network descriptors. Cancel tranfers.
1088  */
1092  bsb_tlist_del(bs);
1093  continue;
1094  }
1095  /* Take unused msg buf number if it wasn't taken before */
1096  if (!msg_taken) {
1097  msg_index = m0_net_test_ringbuf_pop(rb_ping);
1098  msg_taken = true;
1099  msg_bd_nr = 0;
1100  msg_offset = 0;
1101  msg_bs = &ctx->nbc_bsp[msg_index];
1102  list_is_empty =
1103  bsb_tlist_is_empty(&msg_bs->bsp_buffers);
1104  M0_ASSERT(list_is_empty);
1105  }
1106  /* Try to serialize two buffer descriptors */
1107  len = client_bds_serialize2(&ctx->nbc_net, bs->bsb_index,
1108  msg_index, msg_offset);
1109  /*
1110  LOGD("msg_index = %lu, len = %lu, msg_offset = %lu",
1111  (unsigned long ) msg_index, (unsigned long ) len,
1112  (unsigned long ) msg_offset);
1113  */
1114  msg_offset += len;
1115  if (len == 0) {
1116  if (msg_bd_nr > 0) {
1117  /* No free space in ping buffer */
1118  bsp_tlist_add_tail(&messages, msg_bs);
1119  msg_taken = false;
1120  goto take_msg;
1121  } else {
1122  /*
1123  * Serializing failed for unknown reason
1124  * (or ping buffer is smaller than
1125  * size of two serialized bulk
1126  * network buffer descriptors)
1127  */
1129  TS_FAILED2);
1131  bsb_tlist_del(bs);
1132  msg_taken = false;
1133  }
1134  } else {
1135  buf_last = bsb_tlist_next(&ss->ssb_buffers, bs) == NULL;
1136  bsb_tlist_del(bs);
1137  bsb_tlist_add_tail(&msg_bs->bsp_buffers, bs);
1138  msg_bd_nr += 2;
1139  if (msg_bd_nr == ctx->nbc_bd_nr_max || buf_last) {
1140  bsp_tlist_add_tail(&messages, msg_bs);
1141  msg_taken = false;
1142  }
1143  }
1144  } m0_tl_endfor;
1145  M0_ASSERT(!msg_taken);
1146  m0_tl_for(bsp, &messages, msg_bs) {
1147  list_is_empty = bsb_tlist_is_empty(&msg_bs->bsp_buffers);
1148  M0_ASSERT(!list_is_empty);
1149  /*
1150  * Change state to BD_SENT for every bulk buffer, which
1151  * descriptor is stored in current message.
1152  */
1153  m0_tl_for(bsb, &msg_bs->bsp_buffers, bs) {
1155  } m0_tl_endfor;
1156  rc = m0_net_test_network_msg_send(&ctx->nbc_net,
1157  msg_bs->bsp_index,
1158  ss->ssb_index);
1159  if (rc != 0) {
1160  LOGD("--- msg send FAILED!");
1162  }
1163  /* Save rc for future analysis */
1164  m0_tl_for(bsb, &msg_bs->bsp_buffers, bs) {
1165  bs->bsb_msg.bse_func_rc = rc;
1166  /* Change state if msg sending failed */
1167  if (rc != 0) {
1169  TS_FAILED2);
1171  bsb_tlist_del(bs);
1172  }
1173  } m0_tl_endfor;
1174  bsp_tlist_del(msg_bs);
1175  } m0_tl_endfor;
1176  bsp_tlist_fini(&messages);
1177 }
1178 
1180 {
1181  struct server_status_bulk *ss;
1182  struct buf_status_bulk *bs;
1183  struct m0_tl servers;
1184  size_t index;
1185  size_t i;
1186  size_t nr;
1187 
1188  M0_PRE(ctx != NULL);
1189 
1190  ssb_tlist_init(&servers);
1191  nr = m0_net_test_ringbuf_nr(&ctx->nbc_rb_bulk_queued);
1192  /* Add queued buffer to per server list of queued buffers */
1193  for (i = 0; i < nr; ++i) {
1194  index = m0_net_test_ringbuf_pop(&ctx->nbc_rb_bulk_queued);
1195  bs = &ctx->nbc_bs[index];
1196  ss = &ctx->nbc_sstatus[client_server_index(ctx, index)];
1197  bsb_tlist_add_tail(&ss->ssb_buffers, bs);
1198  if (!ssb_tlink_is_in(ss))
1199  ssb_tlist_add_tail(&servers, ss);
1200  }
1201  /* Send message with buffer descriptors to every server */
1202  m0_tl_teardown(ssb, &servers, ss) {
1203  client_bds_send(ctx, ss);
1204  }
1205  ssb_tlist_fini(&servers);
1206 }
1207 
1209 {
1210  size_t i;
1211 
1212  M0_PRE(ctx != NULL);
1213 
1214  for (i = 0; i < ctx->nbc_bs_nr; ++i) {
1215  ctx->nbc_bs[i].bsb_ts = TS_UNUSED;
1216  m0_net_test_ringbuf_push(&ctx->nbc_rb_bulk_unused, i);
1217  }
1218  for (i = 0; i < ctx->nbc_buf_ping_nr; ++i)
1219  m0_net_test_ringbuf_push(&ctx->nbc_rb_ping_unused, i);
1220 }
1221 
1223 {
1224  size_t i;
1225 
1226  M0_PRE(ctx != NULL);
1227  for (i = 0; i < ctx->nbc_buf_ping_nr; ++i) {
1230  }
1231  for (i = 0; i < ctx->nbc_buf_bulk_nr; ++i) {
1234  }
1235 }
1236 
1238 {
1239  M0_PRE(ctx != NULL);
1240 
1241  return m0_net_test_ringbuf_nr(&ctx->nbc_rb_ping_unused) ==
1242  ctx->nbc_buf_ping_nr &&
1243  m0_net_test_ringbuf_nr(&ctx->nbc_rb_bulk_unused) ==
1244  ctx->nbc_bs_nr;
1245 }
1246 
1247 static void net_bulk_worker_cb(struct node_bulk_ctx *ctx, bool pending)
1248 {
1249  if (USE_LIBFAB) {
1250  /* execute network buffer callbacks in this thread context */
1251  m0_net_buffer_event_deliver_all(ctx->nbc_net.ntc_tm);
1252  M0_ASSERT(ergo(pending, ctx->nbc_callback_executed));
1253  /* update copy of statistics */
1255  /*
1256  * In case of libfabric, there are no pending callbacks,
1257  * hence do not wait for callback in m0_chan_wait() after
1258  * calling m0_net_buffer_event_deliver_all().
1259  */
1260  } else {
1261  ctx->nbc_callback_executed = false;
1262  /* execute network buffer callbacks in this thread context */
1263  m0_net_buffer_event_deliver_all(ctx->nbc_net.ntc_tm);
1264  M0_ASSERT(ergo(pending, ctx->nbc_callback_executed));
1265  /* state transitions from final states */
1267  /* update copy of statistics */
1269  /* wait for STOP command or buffer event */
1270  if (!ctx->nbc_callback_executed)
1271  m0_chan_wait(&ctx->nbc_stop_clink);
1272  }
1273 }
1274 
1275 static void node_bulk_worker(struct node_bulk_ctx *ctx)
1276 {
1277  struct m0_clink tm_clink;
1278  struct m0_chan tm_chan = {};
1279  struct m0_mutex tm_chan_mutex = {};
1280  bool pending;
1281  bool running;
1282 
1283  M0_PRE(ctx != NULL);
1284 
1285  /* all buffers are unused */
1287  /* attach tm_clink to clink group to wait for two signals at once */
1288  m0_clink_attach(&tm_clink, &ctx->nbc_stop_clink, NULL);
1289  /*
1290  * Init wait channel and clink.
1291  * Transfer machine will signal to this channel.
1292  */
1293  m0_mutex_init(&tm_chan_mutex);
1294  m0_chan_init(&tm_chan, &tm_chan_mutex);
1295  m0_clink_add_lock(&tm_chan, &tm_clink);
1296  /* main loop */
1297  running = true;
1298  while (running || !node_bulk_bufs_unused_all(ctx)) {
1299  /* Synchronise with node_bulk_cb(). */
1300  m0_mutex_lock(&ctx->nbc_bulk_mutex);
1301  if (running) {
1302  if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
1305  } else {
1307  }
1308  }
1309  /* notification for buffer events */
1310  pending = m0_net_buffer_event_pending(ctx->nbc_net.ntc_tm);
1311  if (!pending) {
1312  m0_net_buffer_event_notify(ctx->nbc_net.ntc_tm,
1313  &tm_chan);
1314  }
1315  net_bulk_worker_cb(ctx, pending);
1316  if (running && node_bulk_is_stopping(ctx)) {
1317  /* dequeue all queued network buffers */
1319  running = false;
1320  }
1321  m0_mutex_unlock(&ctx->nbc_bulk_mutex);
1322  }
1323  /* transfer machine should't signal to tm_chan */
1324  m0_net_buffer_event_notify(ctx->nbc_net.ntc_tm, NULL);
1325  m0_clink_del_lock(&tm_clink);
1326  m0_clink_fini(&tm_clink);
1327  m0_chan_fini_lock(&tm_chan);
1328  m0_mutex_fini(&tm_chan_mutex);
1329 }
1330 
1332  const struct m0_net_test_cmd *cmd)
1333 {
1334  struct m0_net_test_network_timeouts *timeouts;
1335  struct m0_net_test_network_cfg net_cfg;
1336  const struct m0_net_test_cmd_init *icmd;
1337  struct server_status_bulk *ss;
1338  struct buf_status_ping *msg_bs;
1339  int rc;
1340  size_t i;
1341  bool role_client;
1342  m0_time_t to_send;
1343  m0_time_t to_bulk;
1344  size_t nr;
1345 
1346  M0_PRE(ctx != NULL);
1347 
1348  if (cmd == NULL)
1349  goto fini;
1350  icmd = &cmd->ntc_init;
1351  role_client = icmd->ntci_role == M0_NET_TEST_ROLE_CLIENT;
1352 
1353  rc = -ENOMEM;
1354  M0_ALLOC_ARR(ctx->nbc_bs, ctx->nbc_bs_nr);
1355  if (ctx->nbc_bs == NULL)
1356  goto fail;
1357  for (i = 0; i < ctx->nbc_bs_nr; ++i) {
1358  ctx->nbc_bs[i].bsb_index = i;
1359  bsb_tlink_init(&ctx->nbc_bs[i]);
1360  M0_SET0(&ctx->nbc_bs[i].bsb_desc_send);
1361  }
1362 
1363  if (role_client) {
1364  M0_ALLOC_ARR(ctx->nbc_sstatus, icmd->ntci_ep.ntsl_nr);
1365  if (ctx->nbc_sstatus == NULL)
1366  goto free_bs_bulk;
1367  for (i = 0; i < icmd->ntci_ep.ntsl_nr; ++i) {
1368  ss = &ctx->nbc_sstatus[i];
1369  ss->ssb_index = i;
1370  bsb_tlist_init(&ss->ssb_buffers);
1371  ssb_tlink_init(ss);
1372  }
1373  M0_ALLOC_ARR(ctx->nbc_bsp, ctx->nbc_buf_ping_nr);
1374  if (ctx->nbc_bsp == NULL)
1375  goto free_sstatus;
1376  for (i = 0; i < ctx->nbc_buf_ping_nr; ++i) {
1377  msg_bs = &ctx->nbc_bsp[i];
1378  msg_bs->bsp_index = i;
1379  bsb_tlist_init(&msg_bs->bsp_buffers);
1380  bsp_tlink_init(msg_bs);
1381  }
1382  }
1383 
1384  M0_ASSERT(equi(role_client, ctx->nbc_sstatus != NULL));
1385 
1386  rc = m0_net_test_ringbuf_init(&ctx->nbc_rb_ping_unused,
1387  ctx->nbc_buf_ping_nr);
1388  if (rc != 0)
1389  goto free_bsp;
1390  rc = m0_net_test_ringbuf_init(&ctx->nbc_rb_bulk_unused,
1391  ctx->nbc_bs_nr);
1392  if (rc != 0)
1393  goto free_rb_ping_unused;
1394  rc = m0_net_test_ringbuf_init(&ctx->nbc_rb_bulk_queued,
1395  ctx->nbc_bs_nr);
1396  if (rc != 0)
1397  goto free_rb_bulk_unused;
1398  rc = m0_net_test_ringbuf_init(&ctx->nbc_rb_bulk_final,
1399  ctx->nbc_bs_nr);
1400  if (rc != 0)
1401  goto free_rb_bulk_queued;
1402 
1403  M0_SET0(&net_cfg);
1404  net_cfg.ntncfg_tm_cb = node_bulk_tm_cb;
1405  net_cfg.ntncfg_buf_cb = node_bulk_buf_cb;
1406  net_cfg.ntncfg_buf_size_ping = ctx->nbc_buf_size_ping,
1407  net_cfg.ntncfg_buf_ping_nr = ctx->nbc_buf_ping_nr,
1408  net_cfg.ntncfg_buf_size_bulk = ctx->nbc_buf_size_bulk,
1409  net_cfg.ntncfg_buf_bulk_nr = ctx->nbc_buf_bulk_nr,
1410  net_cfg.ntncfg_ep_max = icmd->ntci_ep.ntsl_nr,
1412  net_cfg.ntncfg_sync = true;
1413  /* configure timeouts */
1414  to_send = icmd->ntci_buf_send_timeout;
1415  to_bulk = icmd->ntci_buf_bulk_timeout;
1416  timeouts = &net_cfg.ntncfg_timeouts;
1417  timeouts->ntnt_timeout[M0_NET_QT_MSG_SEND] = to_send;
1418  timeouts->ntnt_timeout[M0_NET_QT_PASSIVE_BULK_RECV] = to_bulk;
1419  timeouts->ntnt_timeout[M0_NET_QT_PASSIVE_BULK_SEND] = to_bulk;
1420  timeouts->ntnt_timeout[M0_NET_QT_ACTIVE_BULK_RECV] = to_bulk;
1421  timeouts->ntnt_timeout[M0_NET_QT_ACTIVE_BULK_SEND] = to_bulk;
1422 
1423  rc = m0_net_test_network_ctx_init(&ctx->nbc_net, &net_cfg,
1424  icmd->ntci_tm_ep);
1425  if (rc != 0)
1426  goto free_rb_bulk_final;
1427  rc = m0_net_test_network_ep_add_slist(&ctx->nbc_net, &icmd->ntci_ep);
1428  if (rc != 0)
1429  goto fini;
1430  m0_mutex_init(&ctx->nbc_stop_chan_mutex);
1431  m0_mutex_init(&ctx->nbc_bulk_mutex);
1432  m0_chan_init(&ctx->nbc_stop_chan, &ctx->nbc_stop_chan_mutex);
1433  m0_clink_init(&ctx->nbc_stop_clink, NULL);
1434  m0_clink_add_lock(&ctx->nbc_stop_chan, &ctx->nbc_stop_clink);
1435  goto success;
1436 fini:
1437  rc = 0;
1438  m0_clink_del_lock(&ctx->nbc_stop_clink);
1439  m0_clink_fini(&ctx->nbc_stop_clink);
1440  m0_chan_fini_lock(&ctx->nbc_stop_chan);
1441  m0_mutex_fini(&ctx->nbc_stop_chan_mutex);
1442  m0_mutex_fini(&ctx->nbc_bulk_mutex);
1443  m0_net_test_network_ctx_fini(&ctx->nbc_net);
1444 free_rb_bulk_final:
1445  m0_net_test_ringbuf_fini(&ctx->nbc_rb_bulk_final);
1446 free_rb_bulk_queued:
1447  m0_net_test_ringbuf_fini(&ctx->nbc_rb_bulk_queued);
1448 free_rb_bulk_unused:
1449  m0_net_test_ringbuf_fini(&ctx->nbc_rb_bulk_unused);
1450 free_rb_ping_unused:
1451  m0_net_test_ringbuf_fini(&ctx->nbc_rb_ping_unused);
1452 free_bsp:
1453  if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
1454  for (i = 0; i < ctx->nbc_buf_ping_nr; ++i) {
1455  msg_bs = &ctx->nbc_bsp[i];
1456  bsp_tlink_init(msg_bs);
1457  bsb_tlist_fini(&msg_bs->bsp_buffers);
1458  }
1459  m0_free(ctx->nbc_bsp);
1460  }
1461 free_sstatus:
1462  if (ctx->nbc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
1463  nr = cmd != NULL ? cmd->ntc_init.ntci_ep.ntsl_nr :
1464  ctx->nbc_net.ntc_ep_nr;
1465  for (i = 0; i < nr; ++i) {
1466  ss = &ctx->nbc_sstatus[i];
1467  ssb_tlink_fini(ss);
1468  bsb_tlist_fini(&ss->ssb_buffers);
1469  }
1470  m0_free(ctx->nbc_sstatus);
1471  }
1472 free_bs_bulk:
1473  for (i = 0; i < ctx->nbc_bs_nr; ++i)
1474  bsb_tlink_fini(&ctx->nbc_bs[i]);
1475  m0_free(ctx->nbc_bs);
1476 fail:
1477 success:
1478  return rc;
1479 }
1480 
1481 static void *node_bulk_initfini(void *ctx_, struct m0_net_test_service *svc)
1482 {
1483  struct node_bulk_ctx *ctx;
1484  int rc;
1485 
1486  M0_PRE(equi(ctx_ == NULL, svc != NULL));
1487 
1488  if (svc != NULL) {
1489  M0_ALLOC_PTR(ctx);
1490  if (ctx != NULL) {
1491  ctx->nbc_svc = svc;
1492  ctx->nbc_nh.ntnh_test_initialized = false;
1493  }
1494  } else {
1495  ctx = ctx_;
1497  M0_ASSERT(rc == 0);
1498  m0_free(ctx);
1499  }
1500  return svc != NULL ? ctx : NULL;
1501 }
1502 
1504 {
1505  return node_bulk_initfini(NULL, svc);
1506 }
1507 
1508 static void node_bulk_fini(void *ctx_)
1509 {
1510  void *rc = node_bulk_initfini(ctx_, NULL);
1511  M0_POST(rc == NULL);
1512 }
1513 
1514 static int node_bulk_step(void *ctx_)
1515 {
1516  return 0;
1517 }
1518 
1519 static int node_bulk_cmd_init(void *ctx_,
1520  const struct m0_net_test_cmd *cmd,
1521  struct m0_net_test_cmd *reply)
1522 {
1523  const struct m0_net_test_cmd_init *icmd;
1524  struct node_bulk_ctx *ctx = ctx_;
1525  int rc;
1526  bool role_client;
1527 
1528  M0_PRE(ctx != NULL);
1529  M0_PRE(cmd != NULL);
1530  M0_PRE(reply != NULL);
1531 
1532  if (ctx->nbc_nh.ntnh_test_initialized) {
1533  rc = -EALREADY;
1534  goto reply;
1535  }
1536  icmd = &cmd->ntc_init;
1537  m0_net_test_nh_init(&ctx->nbc_nh, icmd);
1538  role_client = icmd->ntci_role == M0_NET_TEST_ROLE_CLIENT;
1539  ctx->nbc_buf_size_bulk = icmd->ntci_msg_size;
1540  ctx->nbc_buf_ping_nr = icmd->ntci_bd_buf_nr;
1541  ctx->nbc_buf_size_ping = icmd->ntci_bd_buf_size;
1542  ctx->nbc_bd_nr_max = icmd->ntci_bd_nr_max;
1543  ctx->nbc_bs_nr = icmd->ntci_msg_concurrency;
1544  ctx->nbc_buf_bulk_nr = icmd->ntci_msg_concurrency;
1545 
1546  if (role_client) {
1547  ctx->nbc_client_concurrency = icmd->ntci_msg_concurrency;
1548  ctx->nbc_buf_bulk_nr *= 2 * icmd->ntci_ep.ntsl_nr;
1549  ctx->nbc_bs_nr = ctx->nbc_buf_bulk_nr / 2;
1550  }
1551 
1552  /* do sanity check */
1553  rc = -EINVAL;
1554  if (!ergo(role_client, ctx->nbc_buf_bulk_nr % 2 == 0) ||
1555  ctx->nbc_buf_bulk_nr < 1 || ctx->nbc_buf_size_bulk < 1 ||
1556  ctx->nbc_buf_ping_nr < 1 || ctx->nbc_buf_size_ping < 1 ||
1557  ctx->nbc_bd_nr_max < 1 || ctx->nbc_bs_nr < 1 ||
1558  (ctx->nbc_nh.ntnh_role != M0_NET_TEST_ROLE_CLIENT &&
1559  ctx->nbc_nh.ntnh_role != M0_NET_TEST_ROLE_SERVER) ||
1560  !ergo(role_client, ctx->nbc_client_concurrency != 0) ||
1561  !ergo(!role_client, ctx->nbc_bs_nr == ctx->nbc_buf_bulk_nr) ||
1562  !ergo(role_client, 2 * ctx->nbc_bs_nr == ctx->nbc_buf_bulk_nr))
1563  goto reply;
1564 
1566 reply:
1567  /* fill reply */
1568  reply->ntc_type = M0_NET_TEST_CMD_INIT_DONE;
1569  reply->ntc_done.ntcd_errno = rc;
1570  return rc;
1571 }
1572 
1574 static int node_bulk_cmd_start(void *ctx_,
1575  const struct m0_net_test_cmd *cmd,
1576  struct m0_net_test_cmd *reply)
1577 {
1578  struct m0_net_test_cmd_status_data *sd;
1579  int rc;
1580  struct node_bulk_ctx *ctx = ctx_;
1581  const m0_time_t _1s = M0_MKTIME(1, 0);
1582 
1583  M0_PRE(ctx != NULL);
1584  M0_PRE(cmd != NULL);
1585  M0_PRE(reply != NULL);
1586 
1587  sd = &ctx->nbc_nh.ntnh_sd;
1588 
1590  /* fill test start time */
1591  sd->ntcsd_time_start = m0_time_now();
1592  /* initialize stats */
1595  m0_atomic64_set(&ctx->nbc_stop_flag, 0);
1596  rc = M0_THREAD_INIT(&ctx->nbc_thread, struct node_bulk_ctx *, NULL,
1597  &node_bulk_worker, ctx, "net-test bulk");
1598  if (rc != 0) {
1599  /* change service state */
1602  }
1603  /* fill reply */
1604  reply->ntc_type = M0_NET_TEST_CMD_START_DONE;
1605  reply->ntc_done.ntcd_errno = rc;
1606  return rc;
1607 }
1608 
1610 static int node_bulk_cmd_stop(void *ctx_,
1611  const struct m0_net_test_cmd *cmd,
1612  struct m0_net_test_cmd *reply)
1613 {
1614  struct node_bulk_ctx *ctx = ctx_;
1615  int rc;
1616 
1617  M0_PRE(ctx != NULL);
1618  M0_PRE(cmd != NULL);
1619  M0_PRE(reply != NULL);
1620 
1621  if (!ctx->nbc_nh.ntnh_test_initialized) {
1622  reply->ntc_done.ntcd_errno = -EINVAL;
1623  goto reply;
1624  }
1625  /* stop worker thread */
1626  m0_atomic64_set(&ctx->nbc_stop_flag, 1);
1627  m0_chan_signal_lock(&ctx->nbc_stop_chan);
1628  rc = m0_thread_join(&ctx->nbc_thread);
1629  M0_ASSERT(rc == 0);
1630  m0_thread_fini(&ctx->nbc_thread);
1631  /* change service state */
1634  /* fill reply */
1635  reply->ntc_done.ntcd_errno = 0;
1636 reply:
1637  reply->ntc_type = M0_NET_TEST_CMD_STOP_DONE;
1638  return 0;
1639 }
1640 
1641 static int node_bulk_cmd_status(void *ctx_,
1642  const struct m0_net_test_cmd *cmd,
1643  struct m0_net_test_cmd *reply)
1644 {
1645  struct node_bulk_ctx *ctx = ctx_;
1646 
1647  M0_PRE(ctx != NULL);
1648 
1649  m0_net_test_nh_cmd_status(&ctx->nbc_nh, cmd, reply);
1650  return 0;
1651 }
1652 
1654  {
1656  .ntsch_handler = node_bulk_cmd_init,
1657  },
1658  {
1659  .ntsch_type = M0_NET_TEST_CMD_START,
1660  .ntsch_handler = node_bulk_cmd_start,
1661  },
1662  {
1663  .ntsch_type = M0_NET_TEST_CMD_STOP,
1664  .ntsch_handler = node_bulk_cmd_stop,
1665  },
1666  {
1667  .ntsch_type = M0_NET_TEST_CMD_STATUS,
1668  .ntsch_handler = node_bulk_cmd_status,
1669  },
1670 };
1671 
1674  .ntso_fini = node_bulk_fini,
1675  .ntso_step = node_bulk_step,
1676  .ntso_cmd_handler = node_bulk_cmd_handler,
1677  .ntso_cmd_handler_nr = ARRAY_SIZE(node_bulk_cmd_handler),
1678 };
1679 
1681 {
1683  return 0;
1684 }
1685 
1687 {
1688 }
1689 
1690 #undef NET_TEST_MODULE_NAME
1691 
1694 /*
1695  * Local variables:
1696  * c-indentation-style: "K&R"
1697  * c-basic-offset: 8
1698  * tab-width: 8
1699  * fill-column: 79
1700  * scroll-step: 1
1701  * End:
1702  */
int m0_net_test_node_bulk_init(void)
Definition: node_bulk.c:1680
void m0_net_test_network_buffer_dequeue(struct m0_net_test_network_ctx *ctx, enum m0_net_test_network_buf_type buf_type, int32_t buf_index)
Definition: network.c:501
static void transition(void)
Definition: sm.c:88
enum m0_net_test_cmd_type ntsch_type
Definition: service.h:52
static const struct state_transition node_bulk_client_success[]
Definition: node_bulk.c:405
struct m0_net_test_network_timeouts m0_net_test_network_timeouts_never(void)
Definition: network.c:804
static size_t nr
Definition: dump.c:1505
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_PRE(cond)
size_t nbc_client_concurrency
Definition: node_bulk.c:260
struct m0_tlink bsb_tlink
Definition: node_bulk.c:126
static void node_bulk_tm_event_cb(const struct m0_net_tm_event *ev)
Definition: node_bulk.c:294
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static void client_transfer_start(struct node_bulk_ctx *ctx, size_t bs_index)
Definition: node_bulk.c:956
m0_net_test_nh_msg_type
Definition: node_helper.h:48
static int node_bulk_cmd_stop(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_bulk.c:1610
struct m0_net_test_slist ntci_ep
Definition: commands.h:157
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
struct buf_status_errno bsb_send
Definition: node_bulk.c:133
#define USE_LIBFAB
Definition: net.h:99
uint32_t ntncfg_buf_ping_nr
Definition: network.h:86
m0_time_t ntci_buf_bulk_timeout
Definition: commands.h:153
static struct m0_semaphore q
Definition: rwlock.c:55
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
int m0_net_test_network_msg_recv(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
Definition: network.c:470
static void buf_desc_set0(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
Definition: node_bulk.c:599
int m0_net_test_network_ctx_init(struct m0_net_test_network_ctx *ctx, struct m0_net_test_network_cfg *cfg, const char *tm_addr)
Definition: network.c:368
static int node_bulk_test_init_fini(struct node_bulk_ctx *ctx, const struct m0_net_test_cmd *cmd)
Definition: node_bulk.c:1331
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
struct m0_net_test_mps ntcsd_mps_recv
Definition: commands.h:186
#define ergo(a, b)
Definition: misc.h:293
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
void m0_net_test_service_state_change(struct m0_net_test_service *svc, enum m0_net_test_service_state state)
Definition: service.c:147
static size_t client_server_index(struct node_bulk_ctx *ctx, size_t bs_index)
Definition: node_bulk.c:927
uint64_t m0_time_t
Definition: time.h:37
struct m0_tlink ssb_tlink
Definition: node_bulk.c:173
void m0_net_test_nh_sd_copy_locked(struct m0_net_test_nh *nh)
Definition: node_helper.c:63
static void buf_desc_swap(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
Definition: node_bulk.c:613
m0_bcount_t m0_net_test_network_bd_serialize(enum m0_net_test_serialize_op op, struct m0_net_test_network_ctx *ctx, uint32_t buf_bulk_index, uint32_t buf_ping_index, m0_bcount_t offset)
Definition: network.c:656
M0_TL_DESCR_DEFINE(bsb, "buf_status_bulk", static, struct buf_status_bulk, bsb_tlink, bsb_magic, M0_NET_TEST_BSB_MAGIC, M0_NET_TEST_BSB_HEAD_MAGIC)
m0_net_test_network_buffer_cb_proc_t ntnbc_cb[M0_NET_QT_NR]
Definition: network.h:66
int m0_net_test_ringbuf_init(struct m0_net_test_ringbuf *rb, size_t size)
Definition: ringbuf.c:36
static void node_bulk_cb_server(struct node_bulk_ctx *ctx, size_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: node_bulk.c:743
M0_TL_DEFINE(bsb, static, struct buf_status_bulk)
struct buf_status_errno bsb_recv
Definition: node_bulk.c:138
static void buf_desc_client_free(struct node_bulk_ctx *ctx, size_t bs_index)
Definition: node_bulk.c:633
void m0_net_test_node_bulk_fini(void)
Definition: node_bulk.c:1686
void m0_net_test_nh_sd_update_rtt(struct m0_net_test_nh *nh, m0_time_t rtt)
Definition: node_helper.c:129
static const struct state_transition node_bulk_server_success[]
Definition: node_bulk.c:419
static const struct state_transition node_bulk_client_failure[]
Definition: node_bulk.c:412
struct m0_net_test_ringbuf nbc_rb_bulk_queued
Definition: node_bulk.c:250
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
uint64_t ssb_magic
Definition: node_bulk.c:169
static void node_bulk_buf_unused(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1208
const char * nep_addr
Definition: net.h:503
#define container_of(ptr, type, member)
Definition: misc.h:33
static m0_bcount_t net_test_len_accumulate(m0_bcount_t accumulator, m0_bcount_t addend)
Definition: serialize.h:122
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
uint64_t bsp_magic
Definition: node_bulk.c:153
m0_time_t ntnt_timeout[M0_NET_QT_NR]
Definition: network.h:71
#define M0_SWAP(v0, v1)
Definition: arith.h:207
struct buf_status_bulk * nbc_bs
Definition: node_bulk.c:235
uint64_t bsb_magic
Definition: node_bulk.c:117
size_t bsp_index
Definition: node_bulk.c:155
static int client_bulk_enqueue(struct node_bulk_ctx *ctx, size_t buf_index)
Definition: node_bulk.c:936
m0_bcount_t ntci_msg_size
Definition: commands.h:118
#define m0_tl_endfor
Definition: tlist.h:700
m0_net_test_nh_msg_status
Definition: node_helper.h:42
static const struct m0_net_tm_callbacks node_bulk_tm_cb
Definition: node_bulk.c:299
size_t bsb_index
Definition: node_bulk.c:124
#define equi(a, b)
Definition: misc.h:297
m0_time_t bsb_time_finish
Definition: node_bulk.c:147
m0_net_test_nh_msg_direction
Definition: node_helper.h:55
uint64_t bsb_magic
Definition: balloc.h:634
static const struct @412 node_bulk_state_transitions[]
int m0_net_test_network_bulk_enqueue(struct m0_net_test_network_ctx *ctx, int32_t buf_bulk_index, int32_t ep_index, enum m0_net_queue_type q)
Definition: network.c:480
int i
Definition: dir.c:1033
size_t m0_net_test_ringbuf_pop(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:88
int32_t nbe_status
Definition: net.h:1218
#define LOGD(...)
Definition: tx_regmap.c:37
m0_time_t ntci_buf_send_timeout
Definition: commands.h:151
static void * node_bulk_initfini(void *ctx_, struct m0_net_test_service *svc)
Definition: node_bulk.c:1481
bool m0_net_test_nh_transfer_next(struct m0_net_test_nh *nh)
Definition: node_helper.c:136
struct buf_status_ping * nbc_bsp
Definition: node_bulk.c:239
M0_INTERNAL void m0_clink_attach(struct m0_clink *link, struct m0_clink *group, m0_chan_cb_t cb)
Definition: chan.c:215
uint64_t ntci_bd_nr_max
Definition: commands.h:136
m0_bcount_t ntncfg_buf_size_bulk
Definition: network.h:88
enum transfer_state sta_from
Definition: node_bulk.c:185
void node_bulk_state_transition_auto(struct node_bulk_ctx *ctx, size_t bs_index)
Definition: node_bulk.c:521
static void sd_update(struct node_bulk_ctx *ctx, enum m0_net_test_nh_msg_type type, enum m0_net_test_nh_msg_status status, enum m0_net_test_nh_msg_direction direction)
Definition: node_bulk.c:286
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
static void client_bulk_dequeue(struct node_bulk_ctx *ctx, size_t buf_index)
Definition: node_bulk.c:949
static void client_bulk_bufs_dequeue(struct node_bulk_ctx *ctx, struct buf_status_bulk *bs)
Definition: node_bulk.c:1050
static void node_bulk_state_check_all(void)
Definition: node_bulk.c:459
void m0_net_test_ringbuf_fini(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:52
size_t nbc_bd_nr_max
Definition: node_bulk.c:233
struct m0_net_test_cmd_init ntc_init
Definition: commands.h:208
enum m0_net_test_role ntci_role
Definition: commands.h:104
struct m0_net_test_service * nbc_svc
Definition: node_bulk.c:221
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
struct m0_net_test_network_buffer_callbacks ntncfg_buf_cb
Definition: network.h:82
#define M0_ASSERT(cond)
static int node_bulk_cmd_status(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_bulk.c:1641
static void net_bulk_worker_cb(struct node_bulk_ctx *ctx, bool pending)
Definition: node_bulk.c:1247
static char servers[NTCS_NODES_MAX *NTCS_NODE_ADDR_MAX]
Definition: client_server.c:63
struct m0_chan nbc_stop_chan
Definition: node_bulk.c:267
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
Definition: net.h:752
m0_time_t m0_time_now(void)
Definition: time.c:134
m0_bcount_t nbc_buf_size_ping
Definition: node_bulk.c:229
size_t nbc_buf_bulk_nr
Definition: node_bulk.c:227
struct server_status_bulk * nbc_sstatus
Definition: node_bulk.c:262
Definition: tlist.h:251
struct m0_net_test_mps ntcsd_mps_send
Definition: commands.h:184
void m0_net_test_nh_cmd_status(struct m0_net_test_nh *nh, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_helper.c:146
struct m0_tl bsp_buffers
Definition: node_bulk.c:161
const size_t nbst_nr
Definition: node_bulk.c:430
static void client_process_unused_bulk(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:985
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
void m0_net_test_ringbuf_push(struct m0_net_test_ringbuf *rb, size_t value)
Definition: ringbuf.c:77
static void * node_bulk_init(struct m0_net_test_service *svc)
Definition: node_bulk.c:1503
static m0_bcount_t buf_desc_deserialize(struct node_bulk_ctx *ctx, size_t buf_bulk_index, size_t buf_ping_index, m0_bcount_t offset)
Definition: node_bulk.c:643
void m0_net_test_network_ctx_fini(struct m0_net_test_network_ctx *ctx)
Definition: network.c:375
struct m0_net_test_ringbuf nbc_rb_bulk_final
Definition: node_bulk.c:254
size_t ntsl_nr
Definition: slist.h:49
void m0_net_test_nh_sd_update(struct m0_net_test_nh *nh, enum m0_net_test_nh_msg_type type, enum m0_net_test_nh_msg_status status, enum m0_net_test_nh_msg_direction direction)
Definition: node_helper.c:83
static bool node_bulk_state_is_final(enum transfer_state state)
Definition: node_bulk.c:327
#define TRANSITION(name)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static enum transfer_state node_bulk_state_search(enum transfer_state state, const struct state_transition state_list[], size_t state_nr)
Definition: node_bulk.c:471
static void node_bulk_worker(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1275
static void node_bulk_state_check(const struct state_transition state_list[], size_t state_nr)
Definition: node_bulk.c:444
#define M0_POST(cond)
enum transfer_state bsb_ts
Definition: node_bulk.c:119
struct m0_tlink bsp_tlink
Definition: node_bulk.c:163
m0_bcount_t ntci_bd_buf_size
Definition: commands.h:130
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
Definition: net.c:87
m0_net_test_role
Definition: commands.h:59
static m0_bcount_t client_bds_serialize2(struct m0_net_test_network_ctx *net, size_t bsb_index, size_t msg_buf_index, m0_bcount_t offset)
Definition: node_bulk.c:1014
Definition: chan.h:229
static m0_bindex_t offset
Definition: dump.c:173
struct m0_net_test_nh nbc_nh
Definition: node_bulk.c:217
uint64_t ntci_msg_concurrency
Definition: commands.h:144
int m0_net_test_network_msg_send(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index, uint32_t ep_index)
Definition: network.c:458
static void node_bulk_fini(void *ctx_)
Definition: node_bulk.c:1508
size_t m0_net_test_network_bd_nr(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
Definition: network.c:713
struct m0_atomic64 nbc_stop_flag
Definition: node_bulk.c:277
static struct fdmi_ctx ctx
Definition: main.c:80
transfer_state
Definition: node_bulk.c:89
struct buf_status_errno bsb_msg
Definition: node_bulk.c:128
M0_INTERNAL void m0_chan_signal_lock(struct m0_chan *chan)
Definition: chan.c:165
static struct m0_net_test_network_buffer_callbacks node_bulk_buf_cb
Definition: node_bulk.c:915
M0_INTERNAL void m0_net_buffer_event_deliver_all(struct m0_net_transfer_mc *tm)
Definition: tm.c:397
size_t nbc_bs_nr
Definition: node_bulk.c:237
uint64_t ntci_bd_buf_nr
Definition: commands.h:124
enum transfer_state sta_to
Definition: node_bulk.c:186
struct m0_mutex nbc_stop_chan_mutex
Definition: node_bulk.c:269
size_t m0_net_test_ringbuf_nr(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:106
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
struct m0_net_transfer_mc * ntc_tm
Definition: network.h:113
void m0_net_test_mps_init(struct m0_net_test_mps *mps, unsigned long messages, m0_time_t timestamp, m0_time_t interval)
Definition: stats.c:227
struct m0_net_buffer * m0_net_test_network_buf(struct m0_net_test_network_ctx *ctx, enum m0_net_test_network_buf_type buf_type, uint32_t buf_index)
Definition: network.c:726
static struct m0_net_buffer * net_buf_bulk_get(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
Definition: node_bulk.c:592
M0_INTERNAL void m0_net_buffer_event_notify(struct m0_net_transfer_mc *tm, struct m0_chan *chan)
Definition: tm.c:423
static uint8_t fail[DATA_UNIT_COUNT_MAX+PARITY_UNIT_COUNT_MAX]
const struct state_transition * nbst_transition
Definition: node_bulk.c:429
struct m0_net_test_ringbuf nbc_rb_bulk_unused
Definition: node_bulk.c:245
uint32_t ntncfg_ep_max
Definition: network.h:92
M0_INTERNAL bool m0_net_buffer_event_pending(struct m0_net_transfer_mc *tm)
Definition: tm.c:409
size_t nbc_buf_ping_nr
Definition: node_bulk.c:225
struct m0_net_end_point * ntm_ep
Definition: net.h:868
m0_net_queue_type
Definition: net.h:591
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
static bool node_bulk_is_stopping(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:835
m0_bcount_t ntncfg_buf_size_ping
Definition: network.h:84
void m0_net_test_nh_init(struct m0_net_test_nh *nh, const struct m0_net_test_cmd_init *icmd)
Definition: node_helper.c:34
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:65
struct m0_clink nbc_stop_clink
Definition: node_bulk.c:275
static struct m0_net_test_service svc
Definition: service.c:34
static void node_bulk_state_change_cb(struct node_bulk_ctx *ctx, size_t bs_index, bool success)
Definition: node_bulk.c:485
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
struct m0_net_test_network_timeouts ntncfg_timeouts
Definition: network.h:97
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static int node_bulk_step(void *ctx_)
Definition: node_bulk.c:1514
static int node_bulk_cmd_init(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_bulk.c:1519
bool m0_net_test_ringbuf_is_empty(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:99
struct m0_net_test_service_ops m0_net_test_node_bulk_ops
Definition: node_bulk.c:1672
struct m0_net_buf_desc bsb_desc_send
Definition: node_bulk.c:143
uint32_t ntncfg_buf_bulk_nr
Definition: network.h:90
int fini(struct workload *w)
static void node_bulk_state_change(struct node_bulk_ctx *ctx, size_t bs_index, enum transfer_state state)
Definition: node_bulk.c:332
#define M0_MKTIME(secs, ns)
Definition: time.h:86
int m0_net_test_network_ep_add_slist(struct m0_net_test_network_ctx *ctx, const struct m0_net_test_slist *eps)
Definition: network.c:401
static struct m0_addb2_net * net
Definition: net.c:27
static bool node_bulk_bufs_unused_all(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1237
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
void node_bulk_state_transition_auto_all(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:555
static void client_process_queued_bulk(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1179
static void client_bds_send(struct node_bulk_ctx *ctx, struct server_status_bulk *ss)
Definition: node_bulk.c:1057
static const struct state_transition node_bulk_server_failure[]
Definition: node_bulk.c:423
static bool node_bulk_state_change_allowed(enum transfer_state from, enum transfer_state to, const struct state_transition allowed[], size_t allowed_size)
Definition: node_bulk.c:310
Definition: nucleus.c:42
struct m0_mutex nbc_bulk_mutex
Definition: node_bulk.c:241
static void node_bulk_cb(struct m0_net_test_network_ctx *net_ctx, const uint32_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: node_bulk.c:840
static struct node_bulk_ctx * node_bulk_ctx_from_net_ctx(struct m0_net_test_network_ctx *net_ctx)
Definition: node_bulk.c:304
void *(* ntso_init)(struct m0_net_test_service *svc)
Definition: service.h:76
int type
Definition: dir.c:1031
static struct m0_net_test_service_cmd_handler node_bulk_cmd_handler[]
Definition: node_bulk.c:1653
static void server_process_unused_ping(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:571
static m0_bcount_t node_bulk_server_transfer_start(struct node_bulk_ctx *ctx, size_t buf_ping_index, m0_bcount_t offset)
Definition: node_bulk.c:696
static void node_bulk_cb_client(struct node_bulk_ctx *ctx, size_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: node_bulk.c:803
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
m0_bcount_t nbc_buf_size_bulk
Definition: node_bulk.c:231
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
m0_time_t bsb_time_start
Definition: node_bulk.c:145
static void node_bulk_buf_dequeue(struct node_bulk_ctx *ctx)
Definition: node_bulk.c:1222
static int node_bulk_cmd_start(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_bulk.c:1574
int32_t rc
Definition: trigger_fop.h:47
void m0_net_test_network_bd_nr_dec(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
Definition: network.c:719
#define ARRAY_SIZE(a)
Definition: misc.h:45
struct m0_net_tm_callbacks ntncfg_tm_cb
Definition: network.h:80
struct m0_thread nbc_thread
Definition: node_bulk.c:223
struct m0_net_test_network_ctx nbc_net
Definition: node_bulk.c:219
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
struct m0_tl ssb_buffers
Definition: node_bulk.c:175
struct m0_net_test_ringbuf nbc_rb_ping_unused
Definition: node_bulk.c:243
bool nbc_callback_executed
Definition: node_bulk.c:282
static void buf_desc_server_free(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
Definition: node_bulk.c:623
#define M0_IMPOSSIBLE(fmt,...)