Motr  M0
libfab.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2021 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
176 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_NET
177 #include "lib/trace.h" /* M0_ENTRY() */
178 
179 #ifdef ENABLE_LIBFAB
180 
181 #include <netinet/in.h> /* INET_ADDRSTRLEN */
182 #include <arpa/inet.h> /* inet_pton, htons */
183 #include <sched.h> /* sched_yield */
184 #include <stdlib.h> /* atoi */
185 #include <sys/epoll.h> /* struct epoll_event */
186 #include <unistd.h> /* close */
187 #include "net/buffer_pool.h" /* struct m0_net_buffer_pool */
188 #include "net/net.h" /* struct m0_net_domain */
189 #include "lib/errno.h" /* errno */
190 #include "lib/finject.h" /* M0_FI_ENABLED */
191 #include "lib/hash.h" /* m0_htable */
192 #include "lib/memory.h" /* M0_ALLOC_PTR()*/
193 #include "lib/processor.h" /* m0_processor_is_vm()*/
194 #include "libfab_internal.h"
195 #include "lib/string.h" /* m0_streq */
196 #include "net/net_internal.h" /* m0_net__buffer_invariant() */
197 
198 #define U32_TO_VPTR(a) ((void*)((uintptr_t)a))
199 #define VPTR_TO_U32(a) ((uint32_t)((uintptr_t)a))
200 
201 /* Assert the equivalence of return code for libfabric and motr */
202 M0_BASSERT(FI_SUCCESS == 0);
203 
204 static const char *providers[FAB_FABRIC_PROV_MAX] = {
205  [FAB_FABRIC_PROV_VERBS] = "verbs",
206  [FAB_FABRIC_PROV_TCP] = "tcp",
207  [FAB_FABRIC_PROV_SOCK] = "sockets" };
208 
209 M0_TL_DESCR_DEFINE(fab_buf, "libfab_buf",
210  static, struct m0_fab__buf, fb_linkage, fb_magic,
212 M0_TL_DEFINE(fab_buf, static, struct m0_fab__buf);
213 
214 M0_TL_DESCR_DEFINE(fab_sndbuf, "libfab_sndbuf",
215  static, struct m0_fab__buf, fb_snd_link, fb_sndmagic,
217 M0_TL_DEFINE(fab_sndbuf, static, struct m0_fab__buf);
218 
219 M0_TL_DESCR_DEFINE(fab_fabs, "libfab_fabrics",
220  static, struct m0_fab__fab, fab_link, fab_magic,
222 M0_TL_DEFINE(fab_fabs, static, struct m0_fab__fab);
223 
224 M0_TL_DESCR_DEFINE(fab_bulk, "libfab_bulkops",
225  static, struct m0_fab__bulk_op, fbl_link, fbl_magic,
227 M0_TL_DEFINE(fab_bulk, static, struct m0_fab__bulk_op);
228 
229 static uint32_t libfab_bht_func(const struct m0_htable *ht, const void *key)
230 {
231  const union m0_fab__token *token = key;
232 
233  /*
234  * Max buckets = ((M0_NET_QT_NR + 1) * FAB_NUM_BUCKETS_PER_QTYPE)
235  * The bucket id is defined by the queue_id and the queue_num
236  * fields of the token
237  */
238  return ((token->t_Fields.tf_queue_id * FAB_NUM_BUCKETS_PER_QTYPE) +
239  token->t_Fields.tf_queue_num);
240 }
241 
242 static bool libfab_bht_key_eq(const void *key1, const void *key2)
243 {
244  const union m0_fab__token *token1 = key1;
245  const union m0_fab__token *token2 = key2;
246 
247  return token1->t_val == token2->t_val;
248 }
249 
250 M0_HT_DESCR_DEFINE(fab_bufhash, "Hash of bufs", static, struct m0_fab__buf,
251  fb_htlink, fb_htmagic, M0_NET_LIBFAB_BUF_HT_MAGIC,
253  fb_token, libfab_bht_func, libfab_bht_key_eq);
254 
255 M0_HT_DEFINE(fab_bufhash, static, struct m0_fab__buf, uint32_t);
256 
257 static int libfab_ep_txres_init(struct m0_fab__active_ep *aep,
258  struct m0_fab__tm *tm, void *ctx);
259 static int libfab_ep_rxres_init(struct m0_fab__active_ep *aep,
260  struct m0_fab__tm *tm, void *ctx);
261 static int libfab_pep_res_init(struct m0_fab__passive_ep *pep,
262  struct m0_fab__tm *tm, void *ctx);
263 static struct m0_fab__ep *libfab_ep(struct m0_net_end_point *net);
264 static int libfab_ep_find(struct m0_net_transfer_mc *tm, const char *name,
265  struct m0_net_ip_params *addr,
266  struct m0_net_end_point **epp);
267 static int libfab_ep_create(struct m0_net_transfer_mc *tm, const char *name,
268  struct m0_net_ip_params *addr,
269  struct m0_net_end_point **epp);
270 static int libfab_active_ep_create(struct m0_fab__ep *ep,
271  struct m0_fab__tm *tm);
272 static int libfab_passive_ep_create(struct m0_fab__ep *ep,
273  struct m0_fab__tm *tm);
274 static int libfab_aep_param_free(struct m0_fab__active_ep *aep,
275  struct m0_fab__tm *tm);
276 static int libfab_pep_param_free(struct m0_fab__passive_ep *pep,
277  struct m0_fab__tm *tm);
278 static int libfab_ep_param_free(struct m0_fab__ep *ep, struct m0_fab__tm *tm);
279 static int libfab_pep_res_free(struct m0_fab__pep_res *pep_res,
280  struct m0_fab__tm *tm);
281 static int libfab_ep_txres_free(struct m0_fab__tx_res *tx_res,
282  struct m0_fab__tm *tm);
283 static int libfab_ep_rxres_free(struct m0_fab__rx_res *rx_res,
284  struct m0_fab__tm *tm);
285 static void libfab_poller(struct m0_fab__tm *ma);
286 static int libfab_waitfd_init(struct m0_fab__tm *tm);
287 static void libfab_tm_event_post(struct m0_fab__tm *tm,
288  enum m0_net_tm_state state);
289 static inline void libfab_tm_lock(struct m0_fab__tm *tm);
290 static inline void libfab_tm_unlock(struct m0_fab__tm *tm);
291 static inline void libfab_tm_evpost_lock(struct m0_fab__tm *tm);
292 static inline void libfab_tm_evpost_unlock(struct m0_fab__tm *tm);
293 static inline bool libfab_tm_is_locked(const struct m0_fab__tm *tm);
294 static void libfab_buf_complete(struct m0_fab__buf *buf);
295 static void libfab_buf_done(struct m0_fab__buf *buf, int rc, bool add_to_list);
296 static inline struct m0_fab__tm *libfab_buf_tm(struct m0_fab__buf *buf);
297 static inline struct m0_fab__tm *libfab_buf_ma(struct m0_net_buffer *buf);
298 static bool libfab_tm_invariant(const struct m0_fab__tm *tm);
299 static void libfab_buf_del(struct m0_net_buffer *nb);
300 static inline void libfab_ep_get(struct m0_fab__ep *ep);
301 static void libfab_ep_release(struct m0_ref *ref);
302 static uint64_t libfab_mr_keygen(struct m0_fab__tm *tm);
303 static int libfab_check_for_event(struct fid_eq *eq, uint32_t *ev);
304 static int libfab_check_for_comp(struct fid_cq *cq, uint32_t *ctx,
305  m0_bindex_t *len, uint64_t *rem_cq_data);
306 static void libfab_tm_fini(struct m0_net_transfer_mc *tm);
307 static int libfab_buf_dom_reg(struct m0_net_buffer *nb, struct m0_fab__tm *tm);
308 static int libfab_buf_dom_dereg(struct m0_fab__buf *fbp);
309 static void libfab_pending_bufs_send(struct m0_fab__ep *ep);
310 static int libfab_target_notify(struct m0_fab__buf *buf);
311 static int libfab_conn_init(struct m0_fab__ep *ep, struct m0_fab__tm *ma,
312  struct m0_fab__buf *fbp);
313 static int libfab_conn_accept(struct m0_fab__ep *ep, struct m0_fab__tm *tm,
314  struct fi_info *info);
315 static int libfab_fab_ep_find(struct m0_fab__tm *tm, const char *name,
316  struct m0_net_ip_params *addr,
317  struct m0_fab__ep **ep);
318 static void libfab_ep_pton(struct m0_net_ip_addr *name, uint64_t *out);
319 static void libfab_txep_event_check(struct m0_fab__ep *txep,
320  struct m0_fab__active_ep *aep,
321  struct m0_fab__tm *tm);
322 static int libfab_txep_init(struct m0_fab__active_ep *aep,
323  struct m0_fab__tm *tm, void *ctx);
324 static int libfab_waitfd_bind(struct fid* fid, struct m0_fab__tm *tm,
325  void *ctx);
326 static int libfab_waitfd_unbind(struct fid* fid, struct m0_fab__tm *tm,
327  void *ctx);
328 static inline struct m0_fab__active_ep *libfab_aep_get(struct m0_fab__ep *ep);
329 static int libfab_ping_op(struct m0_fab__active_ep *ep, struct m0_fab__buf *fb);
330 static int libfab_bulk_op(struct m0_fab__active_ep *ep, struct m0_fab__buf *fb);
331 static inline bool libfab_is_verbs(struct m0_fab__tm *tm);
332 static int libfab_txbuf_list_add(struct m0_fab__tm *tm, struct m0_fab__buf *fb,
333  struct m0_fab__active_ep *aep);
334 static void libfab_bufq_process(struct m0_fab__tm *tm);
335 static uint32_t libfab_buf_token_get(struct m0_fab__tm *tm,
336  struct m0_fab__buf *fb);
337 static bool libfab_buf_invariant(const struct m0_fab__buf *buf);
338 
339 /* libfab init and fini() : initialized in motr init */
340 M0_INTERNAL int m0_net_libfab_init(void)
341 {
342  m0_net_xprt_register(&m0_net_libfab_xprt);
343  if (m0_streq(M0_DEFAULT_NETWORK, "LF"))
344  m0_net_xprt_default_set(&m0_net_libfab_xprt);
345  return M0_RC(0);
346 }
347 
348 M0_INTERNAL void m0_net_libfab_fini(void)
349 {
350  m0_net_xprt_deregister(&m0_net_libfab_xprt);
351 }
352 
356 static void libfab_straddr_gen(struct m0_net_ip_params *addr,
357  char *ip)
358 {
359  if (likely((addr->nip_fmt_pvt.ia.nia_family == M0_NET_IP_AF_INET) ||
360  (addr->nip_format == M0_NET_IP_LNET_FORMAT)))
361  inet_ntop(AF_INET, &addr->nip_ip_n.sn[0], ip,
362  LIBFAB_ADDR_LEN_MAX);
363  else if (addr->nip_fmt_pvt.ia.nia_family == M0_NET_IP_AF_INET6)
364  inet_ntop(AF_INET6, &addr->nip_ip_n.ln[0], ip,
365  LIBFAB_ADDR_LEN_MAX);
366  else
367  M0_LOG(M0_ERROR, "Family is not supported.");
368 }
369 
388 static int libfab_ep_addr_decode(struct m0_fab__ep *ep, const char *name)
389 {
390  M0_ENTRY("name=%s", name);
391  M0_PRE(name != NULL);
392 
393  if (name[0] == '\0')
394  return M0_ERR(-EPROTO);
395  else
396  return M0_RC(m0_net_ip_parse(name, &ep->fep_name));
397 }
398 
402 static inline void libfab_tm_lock(struct m0_fab__tm *tm)
403 {
404  m0_mutex_lock(&tm->ftm_ntm->ntm_mutex);
405 }
406 
410 static inline void libfab_tm_unlock(struct m0_fab__tm *tm)
411 {
412  m0_mutex_unlock(&tm->ftm_ntm->ntm_mutex);
413 }
414 
415 static inline int libfab_tm_trylock(struct m0_fab__tm *tm)
416 {
417  return m0_mutex_trylock(&tm->ftm_ntm->ntm_mutex);
418 }
419 
423 static inline void libfab_tm_evpost_lock(struct m0_fab__tm *tm)
424 {
425  m0_mutex_lock(&tm->ftm_evpost);
426 }
427 
431 static inline void libfab_tm_evpost_unlock(struct m0_fab__tm *tm)
432 {
433  m0_mutex_unlock(&tm->ftm_evpost);
434 }
435 
440 static inline bool libfab_tm_is_locked(const struct m0_fab__tm *tm)
441 {
442  return m0_mutex_is_locked(&tm->ftm_ntm->ntm_mutex);
443 }
444 
448 static void libfab_tm_event_post(struct m0_fab__tm *tm,
449  enum m0_net_tm_state state)
450 {
451  struct m0_net_end_point *listen = NULL;
452 
453  if (state == M0_NET_TM_STARTED) {
454  /* Check for LISTENING Passive endpoint */
455  listen = &tm->ftm_pep->fep_nep;
456  M0_ASSERT(listen != NULL);
457  }
458 
460  .nte_type = M0_NET_TEV_STATE_CHANGE,
461  .nte_next_state = state,
462  .nte_time = m0_time_now(),
463  .nte_ep = listen,
464  .nte_tm = tm->ftm_ntm,
465  });
466 }
467 
472 static void libfab_tm_buf_timeout(struct m0_fab__tm *ftm)
473 {
474  struct m0_net_transfer_mc *net = ftm->ftm_ntm;
475  struct m0_net_buffer *nb;
476  struct m0_fab__buf *fb;
477  int i;
478  m0_time_t now = m0_time_now();
479 
480  M0_PRE(libfab_tm_is_locked(ftm));
481  M0_PRE(libfab_tm_invariant(ftm));
482 
483  ftm->ftm_tmout_check = m0_time_from_now(FAB_BUF_TMOUT_CHK_INTERVAL, 0);
484  for (i = 0; i < ARRAY_SIZE(net->ntm_q); ++i) {
485  m0_tl_for(m0_net_tm, &net->ntm_q[i], nb) {
486  if (nb->nb_timeout < now) {
487  fb = nb->nb_xprt_private;
489  libfab_buf_dom_dereg(fb);
490  fb->fb_state = FAB_BUF_TIMEDOUT;
491  libfab_buf_done(fb, -ETIMEDOUT, false);
492  }
493  } m0_tl_endfor;
494  }
495  M0_POST(libfab_tm_invariant(ftm));
496 }
497 
505 static void libfab_tm_buf_done(struct m0_fab__tm *ftm)
506 {
507  struct m0_fab__buf *buffer;
508  int nr = 0;
509 
510  M0_PRE(libfab_tm_is_locked(ftm) && libfab_tm_invariant(ftm));
511  m0_tl_teardown(fab_buf, &ftm->ftm_done, buffer) {
512  libfab_buf_complete(buffer);
513  nr++;
514  }
515 
516  if (nr > 0 && ftm->ftm_ntm->ntm_callback_counter == 0)
517  m0_chan_broadcast(&ftm->ftm_ntm->ntm_chan);
518  M0_POST(libfab_tm_invariant(ftm));
519 }
520 
530 static uint32_t libfab_handle_connect_request_events(struct m0_fab__tm *tm)
531 {
532  struct m0_fab__ep *ep = NULL;
533  struct fid_eq *eq;
534  struct fi_eq_err_entry eq_err = {};
535  struct fi_eq_cm_entry *cm_entry;
536  char entry[(sizeof(struct fi_eq_cm_entry) +
537  sizeof(struct m0_fab__conn_data))];
538  uint32_t event;
539  int rc;
540  struct m0_net_ip_addr addr;
541  int ret;
542 
543  eq = tm->ftm_pep->fep_listen->pep_res.fpr_eq;
544  do {
545  rc = fi_eq_read(eq, &event, &entry, sizeof(entry), 0);
546  ret = rc;
547  if (rc >= (int)sizeof(struct fi_eq_cm_entry) &&
548  event == FI_CONNREQ) {
549  cm_entry = (struct fi_eq_cm_entry *)entry;
550  addr.nia_n = *((struct m0_net_ip_params *)
551  (cm_entry->data));
552  rc = libfab_fab_ep_find(tm, NULL, &addr.nia_n, &ep);
553  if (rc == 0) {
554  rc = libfab_conn_accept(ep, tm, cm_entry->info);
555  if (rc != 0)
556  M0_LOG(M0_ERROR, "Conn accept fail %d",
557  rc);
558  } else
559  M0_LOG(M0_ERROR, "fab_ep_find fail rc=%d", rc);
560  fi_freeinfo(cm_entry->info);
561  } else if (rc == -FI_EAVAIL) {
562  rc = fi_eq_readerr(eq, &eq_err, 0);
563  if (rc != sizeof(eq_err))
564  M0_LOG(M0_ERROR, "fi_eq_readerr error =%s",
565  fi_strerror((int) -rc));
566  else
567  M0_LOG(M0_ERROR, "fi_eq_readerr prov err %d:%s",
568  eq_err.prov_errno,
569  fi_eq_strerror(eq, eq_err.prov_errno,
570  eq_err.err_data, NULL,
571  0));
572  } else if (rc != -EAGAIN)
573  /*
574  * For all other events, there is no error info available.
575  * Hence, all such events can be ignored.
576  */
577  M0_LOG(M0_ERROR, "Unexpected event tm=%p rc=%d", tm, rc);
578  } while (ret != -EAGAIN);
579  return 0;
580 }
581 
586 static void libfab_txep_event_check(struct m0_fab__ep *txep,
587  struct m0_fab__active_ep *aep,
588  struct m0_fab__tm *tm)
589 {
590  struct m0_fab__buf *fbp;
591  uint32_t event;
592  int rc;
593 
594  if (aep->aep_rx_state == FAB_CONNECTING) {
595  do {
596  rc = libfab_check_for_event(aep->aep_rx_res.frr_eq,
597  &event);
598  if (rc >= 0 && event == FI_CONNECTED) {
599  aep->aep_rx_state = FAB_CONNECTED;
600  if (txep == tm->ftm_pep)
601  txep->fep_connlink |=
602  FAB_CONNLINK_RXEP_RDY;
603  }
604  } while (rc != -EAGAIN);
605  }
606 
607  do {
608  rc = libfab_check_for_event(aep->aep_tx_res.ftr_eq, &event);
609  if (rc >= 0) {
610  if (event == FI_CONNECTED) {
611  aep->aep_tx_state = FAB_CONNECTED;
612  if (txep == tm->ftm_pep)
613  txep->fep_connlink |=
614  FAB_CONNLINK_TXEP_RDY;
615  else
616  txep->fep_connlink |=
617  FAB_CONNLINK_TXEP_RDY |
618  FAB_CONNLINK_RXEP_RDY;
619  } else if (event == FI_SHUTDOWN) {
620  /* Flush all events from rxep EQ. */
621  if (aep->aep_rx_res.frr_eq != NULL) {
622  while (libfab_check_for_event(
623  aep->aep_rx_res.frr_eq,
624  &event) != -EAGAIN);
625  }
626  /* Reset and reopen endpoint */
627  libfab_txep_init(aep, tm, txep);
628  }
629  } else if (rc == -ECONNREFUSED &&
630  aep->aep_tx_state == FAB_CONNECTING) {
631  libfab_txep_init(aep, tm, txep);
632  m0_tl_teardown(fab_sndbuf, &txep->fep_sndbuf, fbp) {
633  libfab_buf_done(fbp, rc, false);
634  }
635  }
636  } while (rc != -EAGAIN);
637  /* All other types of events can be ignored */
638 
639  if (txep->fep_connlink == FAB_CONNLINK_RDY_TO_SEND) {
640  libfab_pending_bufs_send(txep);
641  txep->fep_connlink = FAB_CONNLINK_PENDING_SEND_DONE;
642  }
643 
644  /*
645  * For version >= 1.12, the libfabric library does not return the error
646  * -111 (ECONNREFUSED) if the remote service has not yet started. Thus
647  * the connection request does not receive any reply and the endpoint
648  * keeps waiting in the FAB_CONNECTING state. To avoid this, the state
649  * of the endpoint is reset after a timeout of 5s to FAB_DISCONNECTED.
650  * Thus, when the next buffer is posted to the endpoint, it will again
651  * try to establish connection by sending out a new connection request.
652  */
653  if (aep->aep_tx_state == FAB_CONNECTING &&
654  m0_time_is_in_past(aep->aep_connecting_tmout)) {
655  M0_LOG(M0_DEBUG,"Reset Conn from %s to %s",
656  (char*)tm->ftm_pep->fep_name.nia_p,
657  (char*)txep->fep_name.nia_p);
658  libfab_txep_init(aep, tm, txep);
659  m0_tl_teardown(fab_sndbuf, &txep->fep_sndbuf, fbp) {
660  libfab_buf_done(fbp, -ECONNREFUSED, false);
661  }
662  }
663 }
664 
668 static void libfab_rxep_comp_read(struct fid_cq *cq, struct m0_fab__ep *ep,
669  struct m0_fab__tm *tm)
670 {
671  struct m0_fab__buf *fb = NULL;
672  uint32_t token[FAB_MAX_COMP_READ];
673  m0_bindex_t len[FAB_MAX_COMP_READ];
674  uint64_t data[FAB_MAX_COMP_READ];
675  int i;
676  int cnt;
677  uint32_t rem_token;
678 
679  if (cq != NULL) {
680  cnt = libfab_check_for_comp(cq, token, len, data);
681  for (i = 0; i < cnt; i++) {
682  fb = fab_bufhash_htable_lookup(
683  &tm->ftm_bufhash.bht_hash,
684  &token[i]);
685  if (fb != NULL) {
686  if (fb->fb_length == 0)
687  fb->fb_length = len[i];
688  fb->fb_ev_ep = ep;
689  libfab_buf_done(fb, 0, false);
690  }
691  if (data[i] != 0) {
692  rem_token = (uint32_t)data[i];
693  fb = fab_bufhash_htable_lookup(
694  &tm->ftm_bufhash.bht_hash,
695  &rem_token);
696  if (fb != NULL)
697  libfab_buf_done(fb, 0, false);
698  }
699  }
700  }
701 }
702 
706 static void libfab_txep_comp_read(struct fid_cq *cq, struct m0_fab__tm *tm)
707 {
708  struct m0_fab__active_ep *aep;
709  struct m0_fab__buf *fb = NULL;
710  uint32_t token[FAB_MAX_COMP_READ];
711  int i;
712  int cnt;
713 
714  cnt = libfab_check_for_comp(cq, token, NULL, NULL);
715  for (i = 0; i < cnt; i++) {
716  if (token[i] != 0)
717  fb = fab_bufhash_htable_lookup(
718  &tm->ftm_bufhash.bht_hash,
719  &token[i]);
720  else
721  fb = NULL;
722  if (fb != NULL) {
723  aep = libfab_aep_get(fb->fb_txctx);
724  if ((fb->fb_token & M0_NET_QT_NR) == M0_NET_QT_NR) {
725  fab_bufhash_htable_del(
726  &tm->ftm_bufhash.bht_hash, fb);
727  M0_ASSERT(aep->aep_bulk_cnt);
728  --aep->aep_bulk_cnt;
729  aep->aep_txq_full = false;
730  m0_free(fb);
731  } else {
732  if (M0_IN(fb->fb_nb->nb_qtype,
736  M0_ASSERT(aep->aep_bulk_cnt >=
737  fb->fb_wr_cnt);
738  aep->aep_bulk_cnt -= fb->fb_wr_cnt;
739  aep->aep_txq_full = false;
740  }
741  libfab_target_notify(fb);
742  libfab_buf_done(fb, 0, false);
743  }
744  }
745  }
746 }
747 
752 static void libfab_poller(struct m0_fab__tm *tm)
753 {
754  struct m0_net_end_point *net;
755  struct m0_fab__ev_ctx *ctx;
756  struct m0_fab__ep *xep;
757  struct m0_fab__active_ep *aep;
758  struct fid_cq *cq;
759  struct epoll_event ev;
760  int ev_cnt;
761  int ret;
762  int err;
763 
764  libfab_tm_event_post(tm, M0_NET_TM_STARTED);
765  while (tm->ftm_state != FAB_TM_SHUTDOWN) {
766  do {
767  ret = fi_trywait(tm->ftm_fab->fab_fab,
768  tm->ftm_fids.ftf_head,
769  tm->ftm_fids.ftf_cnt);
770  /*
771  * TBD : Add handling of other return values of
772  * fi_trywait() if it returns something other than
773  * -EAGAIN and 0. Also, observed that fi_trywait()
774  * returns -22(EINVAL) which is not mentioned in
775  * libfabric documentation, hence added it to the list
776  * of possible return values of fi_trywait().
777  */
778  if (!M0_IN(ret, (0, -EAGAIN, -EINVAL)))
779  M0_LOG(M0_ERROR, "Unexpected fi_trywait rc=%d",
780  ret);
781 
782  if (ret == 0) {
783  ret = epoll_wait(tm->ftm_epfd, &ev, 1,
784  FAB_WAIT_FD_TMOUT);
785  /*
786  * M0_ERR is omitted because we expect only one
787  * particular error, and this error gets
788  * handled by the loop.
789  */
790  err = ret < 0 ? -errno : 0;
791  if (!M0_IN(ret, (-1, 0, 1)))
793  "Unexpected epoll_wait rc=%d",
794  ret);
795  if (ret == -1 && err != -EINTR)
797  "Unexpected epoll_wait err=%d",
798  err);
799  ev_cnt = ret > 0 ? ret : 0;
800  } else {
801  ev_cnt = 0;
802  err = 0;
803  }
804  } while (err == -EINTR);
805 
806  while (1) {
807  m0_mutex_lock(&tm->ftm_endlock);
808  if (tm->ftm_state == FAB_TM_SHUTDOWN) {
809  m0_mutex_unlock(&tm->ftm_endlock);
810  return;
811  }
812 
813  ret = libfab_tm_trylock(tm);
814  m0_mutex_unlock(&tm->ftm_endlock);
815  if (ret == 0) {
816  /*
817  * Got tm lock.
818  * Let's continue processing events.
819  */
820  break;
821  }
822  }
823 
824  M0_ASSERT(libfab_tm_is_locked(tm) && libfab_tm_invariant(tm));
825 
826  /* Check the common queue of the transfer machine for events */
827  libfab_handle_connect_request_events(tm);
828  libfab_txep_comp_read(tm->ftm_tx_cq, tm);
829 
830  if (ev_cnt > 0) {
831  ctx = ev.data.ptr;
832  if (ctx->evctx_type != FAB_COMMON_Q_EVENT) {
833  /*
834  * Check the private queue of the
835  * endpoint for events.
836  */
837  xep = ctx->evctx_ep;
838  aep = libfab_aep_get(xep);
839  libfab_txep_event_check(xep, aep, tm);
840  cq = aep->aep_rx_res.frr_cq;
841  libfab_rxep_comp_read(cq, xep, tm);
842  }
843  }
844 
845  /*
846  * Process events on private queue of the endpoints in round-
847  * robin fashion.
848  */
849  net = m0_nep_tlist_pop(&tm->ftm_ntm->ntm_end_points);
850  M0_ASSERT(net != NULL);
851  /*
852  * TM lock can be released by libfab_txep_event_check(), make
853  * sure the end-point stays alive.
854  */
856  m0_nep_tlist_add_tail(&tm->ftm_ntm->ntm_end_points, net);
857  xep = libfab_ep(net);
858  aep = libfab_aep_get(xep);
859  libfab_txep_event_check(xep, aep, tm);
860  cq = aep->aep_rx_res.frr_cq;
861  libfab_rxep_comp_read(cq, xep, tm);
862  /* Release, with TM lock already held. */
863  m0_ref_put(&net->nep_ref);
864 
865  libfab_bufq_process(tm);
866  if (m0_time_is_in_past(tm->ftm_tmout_check))
867  libfab_tm_buf_timeout(tm);
868  libfab_tm_buf_done(tm);
869 
870  M0_ASSERT(libfab_tm_invariant(tm));
871  libfab_tm_unlock(tm);
872  }
873 }
874 
878 static inline struct m0_fab__ep *libfab_ep(struct m0_net_end_point *net)
879 {
880  return net != NULL ? net->nep_xprt_pvt : NULL;
881 }
882 
887 static bool libfab_ep_find_by_num(struct m0_net_ip_addr *addr,
888  struct m0_net_transfer_mc *ntm,
889  struct m0_fab__ep **ep)
890 {
891  struct m0_net_end_point *net;
892 
893  net = m0_tl_find(m0_nep, net, &ntm->ntm_end_points,
894  m0_net_ip_addr_eq(&(libfab_ep(net))->fep_name,
895  addr, true));
896 
897  *ep = net != NULL ? libfab_ep(net) : NULL;
898 
899  return net != NULL;
900 }
901 
906 static bool libfab_ep_find_by_str(const char *name,
907  struct m0_net_transfer_mc *ntm,
908  struct m0_fab__ep **ep)
909 {
910  struct m0_net_end_point *net;
911 
912  net = m0_tl_find(m0_nep, net, &ntm->ntm_end_points,
913  strcmp((libfab_ep(net))->fep_name.nia_p, name) == 0);
914 
915  *ep = net != NULL ? libfab_ep(net) : NULL;
916 
917  return net != NULL;
918 }
919 
929 static int libfab_ep_find(struct m0_net_transfer_mc *tm, const char *name,
930  struct m0_net_ip_params *addr,
931  struct m0_net_end_point **epp)
932 {
933  struct m0_fab__ep *ep;
934  struct m0_fab__active_ep *aep;
935  struct m0_net_ip_addr net_ip = {};
936  struct m0_fab__tm *ma;
937  char *wc = NULL;
938  int rc = 0;
939  bool found = false;
940 
941  if (addr != NULL)
942  net_ip.nia_n = *addr;
943  M0_ASSERT(libfab_tm_is_locked(tm->ntm_xprt_private));
944 
945  if (likely(addr != NULL))
946  found = libfab_ep_find_by_num(&net_ip, tm, &ep);
947  else
948  found = libfab_ep_find_by_str(name, tm, &ep);
949 
950  if (!found) {
951  M0_ASSERT(name != NULL || addr != NULL);
952  if (name != NULL)
953  rc = libfab_ep_create(tm, name, addr, epp);
954  else {
955  m0_net_ip_print(&net_ip);
956  rc = libfab_ep_create(tm, net_ip.nia_p, addr, epp);
957  }
958  } else {
959  *epp = &ep->fep_nep;
960  if (name != NULL && addr != NULL) {
961  wc = strchr(name,'*');
962  /*
963  * In lnet format, the epname can contain a wildchar(*)
964  * which can be present instead of numeric tmid
965  */
966  if (wc != NULL &&
967  ep->fep_name.nia_n.nip_port !=
968  net_ip.nia_n.nip_port) {
969  ep->fep_name.nia_n.nip_ip_n.sn[0] =
970  net_ip.nia_n.nip_ip_n.sn[0];
971  ep->fep_name.nia_n.nip_port =
972  net_ip.nia_n.nip_port;
973  ep->fep_name.nia_n.nip_fmt_pvt.la.nla_tmid =
974  net_ip.nia_n.nip_fmt_pvt.la.nla_tmid;
975  libfab_ep_pton(&ep->fep_name,
976  &ep->fep_name_n);
977  aep = libfab_aep_get(ep);
978  ma = tm->ntm_xprt_private;
979  if (aep->aep_tx_state == FAB_CONNECTED)
980  rc = libfab_txep_init(aep, ma, ep);
981  }
982  }
983 
984  if (rc == 0)
985  libfab_ep_get(ep);
986 
987  }
988 
989  return M0_RC(rc);
990 }
991 
995 static int libfab_ep_create(struct m0_net_transfer_mc *tm, const char *name,
996  struct m0_net_ip_params *addr,
997  struct m0_net_end_point **epp)
998 {
999  struct m0_fab__tm *ma = tm->ntm_xprt_private;
1000  struct m0_fab__ep *ep = NULL;
1001  int rc;
1002 
1003  M0_ENTRY("name=%s", name);
1004  M0_PRE(name != NULL);
1005 
1006  M0_ALLOC_PTR(ep);
1007  if (ep == NULL)
1008  return M0_ERR(-ENOMEM);
1009 
1010  M0_ALLOC_PTR(ep->fep_aep);
1011  if (ep->fep_aep == NULL) {
1012  m0_free(ep);
1013  return M0_ERR(-ENOMEM);
1014  }
1015 
1016  ep->fep_listen = NULL;
1017 
1018  rc = libfab_ep_addr_decode(ep, name);
1019  if (rc != 0) {
1020  libfab_aep_param_free(ep->fep_aep, ma);
1021  m0_free(ep);
1022  return M0_ERR(rc);
1023  }
1024 
1025  /*
1026  * Due to wildchar '*' as tmid in lnet format, we need to make sure that
1027  * tmid and port are correctly reconstructed.
1028  */
1029  if (addr != NULL && addr->nip_format == M0_NET_IP_LNET_FORMAT &&
1030  addr->nip_fmt_pvt.la.nla_autotm) {
1031  ep->fep_name.nia_n.nip_port = addr->nip_port;
1032  ep->fep_name.nia_n.nip_fmt_pvt.la.nla_tmid =
1033  addr->nip_fmt_pvt.la.nla_tmid;
1034  }
1035 
1036  rc = libfab_active_ep_create(ep, ma);
1037  if (rc != 0) {
1038  libfab_aep_param_free(ep->fep_aep, ma);
1039  m0_free(ep);
1040  return M0_ERR(rc);
1041  }
1042 
1043  fab_sndbuf_tlist_init(&ep->fep_sndbuf);
1044  *epp = &ep->fep_nep;
1045  return M0_RC(0);
1046 }
1047 
1051 static int libfab_tm_res_init(struct m0_fab__tm *tm)
1052 {
1053  struct m0_fab__fab *fab;
1054  struct m0_fab__passive_ep *pep;
1055  struct fi_cq_attr cq_attr = {};
1056  int rc = 0;
1057 
1058  M0_PRE(tm != NULL);
1059 
1060  pep = tm->ftm_pep->fep_listen;
1061  fab = tm->ftm_fab;
1062  /* Initialise completion queues for tx */
1063  cq_attr.wait_obj = FI_WAIT_FD;
1064  cq_attr.wait_cond = FI_CQ_COND_NONE;
1065  cq_attr.format = FI_CQ_FORMAT_DATA;
1066  cq_attr.size = FAB_MAX_TX_CQ_EV;
1067  rc = fi_cq_open(fab->fab_dom, &cq_attr, &tm->ftm_tx_cq, NULL);
1068  if (rc != 0)
1069  return M0_ERR(rc);
1070 
1071  /* Initialize and bind resources to tx ep */
1072  tm->ftm_txcq_ctx.evctx_type = FAB_COMMON_Q_EVENT;
1073  tm->ftm_txcq_ctx.evctx_ep = NULL;
1074  tm->ftm_txcq_ctx.evctx_dbg = "txep cq";
1075  rc = libfab_waitfd_bind(&tm->ftm_tx_cq->fid, tm, &tm->ftm_txcq_ctx);
1076  if (rc != 0)
1077  return M0_ERR(rc);
1078 
1079  return M0_RC(libfab_txep_init(pep->pep_aep, tm, tm->ftm_pep));
1080 }
1081 
1086 static int libfab_ep_txres_init(struct m0_fab__active_ep *aep,
1087  struct m0_fab__tm *tm, void *ctx)
1088 {
1089  struct fi_eq_attr eq_attr = {};
1090  struct m0_fab__fab *fab;
1091  int rc;
1092 
1093  fab = tm->ftm_fab;
1094 
1095  /* Bind the ep to tx completion queue */
1096  rc = fi_ep_bind(aep->aep_txep, &tm->ftm_tx_cq->fid,
1097  FI_TRANSMIT | FI_RECV | FI_SELECTIVE_COMPLETION);
1098  if (rc != 0)
1099  return M0_ERR(rc);
1100 
1101  /* Initialise and bind event queue */
1102  eq_attr.wait_obj = FI_WAIT_FD;
1103  eq_attr.size = FAB_MAX_AEP_EQ_EV;
1104  rc = fi_eq_open(fab->fab_fab, &eq_attr, &aep->aep_tx_res.ftr_eq, NULL);
1105  if (rc != 0)
1106  return M0_ERR(rc);
1107 
1108  aep->aep_tx_res.ftr_ctx.evctx_type = FAB_PRIVATE_Q_EVENT;
1109  aep->aep_tx_res.ftr_ctx.evctx_ep = ctx;
1110  aep->aep_tx_res.ftr_ctx.evctx_dbg = "txep eq";
1111  rc = libfab_waitfd_bind(&aep->aep_tx_res.ftr_eq->fid, tm,
1112  &aep->aep_tx_res.ftr_ctx);
1113  if (rc != 0)
1114  return M0_ERR(rc);
1115 
1116  rc = fi_ep_bind(aep->aep_txep, &aep->aep_tx_res.ftr_eq->fid, 0);
1117 
1118  return rc != 0 ? M0_ERR(rc) : M0_RC(0);
1119 }
1120 
1125 static int libfab_ep_rxres_init(struct m0_fab__active_ep *aep,
1126  struct m0_fab__tm *tm, void *ctx)
1127 {
1128  struct fi_cq_attr cq_attr = {};
1129  struct fi_eq_attr eq_attr = {};
1130  struct m0_fab__fab *fab;
1131  int rc;
1132 
1133  fab = tm->ftm_fab;
1134 
1135  /* Initialise and bind completion queues for rx */
1136  cq_attr.wait_obj = FI_WAIT_FD;
1137  cq_attr.wait_cond = FI_CQ_COND_NONE;
1138  cq_attr.format = FI_CQ_FORMAT_DATA;
1139  cq_attr.size = FAB_MAX_RX_CQ_EV;
1140  rc = fi_cq_open(fab->fab_dom, &cq_attr, &aep->aep_rx_res.frr_cq, NULL);
1141  if (rc != 0)
1142  return M0_ERR(rc);
1143 
1144  aep->aep_rx_res.frr_cq_ctx.evctx_type = FAB_PRIVATE_Q_EVENT;
1145  aep->aep_rx_res.frr_cq_ctx.evctx_ep = ctx;
1146  aep->aep_rx_res.frr_cq_ctx.evctx_dbg = "rxep cq";
1147  rc = libfab_waitfd_bind(&aep->aep_rx_res.frr_cq->fid, tm,
1148  &aep->aep_rx_res.frr_cq_ctx);
1149  if (rc != 0)
1150  return M0_ERR(rc);
1151 
1152  rc = fi_ep_bind(aep->aep_rxep, &tm->ftm_tx_cq->fid,
1153  FI_TRANSMIT | FI_SELECTIVE_COMPLETION) ? :
1154  fi_ep_bind(aep->aep_rxep, &aep->aep_rx_res.frr_cq->fid, FI_RECV);
1155  if (rc != 0)
1156  return M0_ERR(rc);
1157 
1158  /* Initialise and bind event queue */
1159  eq_attr.wait_obj = FI_WAIT_FD;
1160  eq_attr.size = FAB_MAX_AEP_EQ_EV;
1161  aep->aep_rx_res.frr_eq_ctx.evctx_type = FAB_PRIVATE_Q_EVENT;
1162  aep->aep_rx_res.frr_eq_ctx.evctx_ep = ctx;
1163  aep->aep_rx_res.frr_eq_ctx.evctx_dbg = "rxep eq";
1164  rc = fi_eq_open(fab->fab_fab, &eq_attr, &aep->aep_rx_res.frr_eq,
1165  NULL) ? :
1166  libfab_waitfd_bind(&aep->aep_rx_res.frr_eq->fid, tm,
1167  &aep->aep_rx_res.frr_eq_ctx) ? :
1168  fi_ep_bind(aep->aep_rxep, &aep->aep_rx_res.frr_eq->fid, 0) ? :
1169  fi_ep_bind(aep->aep_rxep, &tm->ftm_rctx->fid, 0);
1170 
1171  return rc != 0 ? M0_ERR(rc) : M0_RC(0);
1172 }
1173 
1178 static int libfab_pep_res_init(struct m0_fab__passive_ep *pep,
1179  struct m0_fab__tm *tm, void *ctx)
1180 {
1181  struct fi_eq_attr eq_attr = {};
1182  int rc = 0;
1183 
1184  /* Initialise and bind event queue */
1185  eq_attr.wait_obj = FI_WAIT_FD;
1186  eq_attr.size = FAB_MAX_PEP_EQ_EV;
1187  rc = fi_eq_open(tm->ftm_fab->fab_fab, &eq_attr, &pep->pep_res.fpr_eq,
1188  NULL);
1189  if (rc != 0)
1190  return M0_ERR(rc);
1191 
1192  pep->pep_res.fpr_ctx.evctx_type = FAB_COMMON_Q_EVENT;
1193  pep->pep_res.fpr_ctx.evctx_ep = ctx;
1194  pep->pep_res.fpr_ctx.evctx_dbg = "pep eq";
1195  rc = libfab_waitfd_bind(&pep->pep_res.fpr_eq->fid, tm,
1196  &pep->pep_res.fpr_ctx) ? :
1197  fi_pep_bind(pep->pep_pep, &pep->pep_res.fpr_eq->fid, 0);
1198 
1199  return rc != 0 ? M0_ERR(rc) : M0_RC(0);
1200 }
1201 
1205 static int libfab_conn_accept(struct m0_fab__ep *ep, struct m0_fab__tm *tm,
1206  struct fi_info *info)
1207 {
1208  struct m0_fab__active_ep *aep;
1209  struct fid_domain *dp;
1210  int rc;
1211 
1212  M0_ENTRY("from ep=%s -> tm=%s", (char*)ep->fep_name.nia_p,
1213  (char*)tm->ftm_pep->fep_name.nia_p);
1214 
1215  aep = libfab_aep_get(ep);
1216  dp = tm->ftm_fab->fab_dom;
1217 
1218  if (aep->aep_rxep != NULL) {
1219  rc = fi_close(&aep->aep_rxep->fid);
1220  if (rc != 0)
1221  M0_LOG(M0_ERROR, "ep close = %d",rc);
1222  libfab_ep_rxres_free(&aep->aep_rx_res, tm);
1223  }
1224  aep->aep_rx_state = FAB_NOT_CONNECTED;
1225  ep->fep_connlink = FAB_CONNLINK_DOWN;
1226 
1227  rc = fi_endpoint(dp, info, &aep->aep_rxep, NULL) ? :
1228  libfab_ep_rxres_init(aep, tm, ep) ? :
1229  fi_enable(aep->aep_rxep) ? :
1230  fi_accept(aep->aep_rxep, NULL, 0);
1231 
1232  if (rc != 0) {
1233  libfab_aep_param_free(aep, tm);
1234  return M0_ERR(rc);
1235  }
1236 
1237  aep->aep_rx_state = FAB_CONNECTING;
1238 
1239  return M0_RC(0);
1240 }
1241 
1245 static int libfab_active_ep_create(struct m0_fab__ep *ep, struct m0_fab__tm *tm)
1246 {
1247  struct m0_net_end_point *net;
1248  struct m0_fab__active_ep *aep;
1249  int rc;
1250 
1251  M0_ASSERT(libfab_tm_is_locked(tm));
1252  aep = ep->fep_aep;
1253  rc = libfab_txep_init(aep, tm, ep);
1254  if (rc != 0) {
1255  libfab_aep_param_free(aep, tm);
1256  return M0_ERR(rc);
1257  }
1258 
1259  net = &ep->fep_nep;
1260  net->nep_xprt_pvt = ep;
1261  net->nep_tm = tm->ftm_ntm;
1262  libfab_ep_pton(&ep->fep_name, &ep->fep_name_n);
1263  m0_nep_tlink_init_at_tail(net, &tm->ftm_ntm->ntm_end_points);
1264  net->nep_addr = (const char *)(&ep->fep_name.nia_p);
1265  m0_ref_init(&ep->fep_nep.nep_ref, 1, &libfab_ep_release);
1266 
1267  return M0_RC(0);
1268 }
1269 
1274 static int libfab_passive_ep_create(struct m0_fab__ep *ep,
1275  struct m0_fab__tm *tm)
1276 {
1277  struct m0_fab__passive_ep *pep;
1278  struct fi_info *hints;
1279  struct fi_info *fi;
1280  enum m0_fab__prov_type idx;
1281  int rc;
1282  int rx_size;
1283  char addr[LIBFAB_ADDR_LEN_MAX] = {};
1284  char port[LIBFAB_PORT_LEN_MAX] = {};
1285 
1286  M0_ENTRY("ep=%s nip_ip_n=[0x%" PRIx64 ",0x%" PRIx64 "] port=%d",
1287  (char*)ep->fep_name.nia_p,
1288  ep->fep_name.nia_n.nip_ip_n.ln[0],
1289  ep->fep_name.nia_n.nip_ip_n.ln[1],
1290  (int)ep->fep_name.nia_n.nip_port);
1291 
1292  M0_ALLOC_PTR(ep->fep_listen);
1293  if (ep->fep_listen == NULL)
1294  return M0_ERR(-ENOMEM);
1295  M0_ALLOC_PTR(ep->fep_listen->pep_aep);
1296  if (ep->fep_listen->pep_aep == NULL) {
1297  m0_free(ep->fep_listen);
1298  return M0_ERR(-ENOMEM);
1299  }
1300 
1301  pep = ep->fep_listen;
1302  ep->fep_listen->pep_aep->aep_rxep = NULL;
1303  ep->fep_listen->pep_aep->aep_txep = NULL;
1304 
1305  libfab_straddr_gen(&ep->fep_name.nia_n, addr);
1306  snprintf(port, ARRAY_SIZE(port), "%d", ep->fep_name.nia_n.nip_port);
1307 
1308  hints = fi_allocinfo();
1309  if (hints == NULL) {
1310  m0_free(pep->pep_aep);
1311  m0_free(pep);
1312  return M0_ERR(-ENOMEM);
1313  }
1314 
1315  hints->ep_attr->type = FI_EP_MSG;
1316  hints->caps = FI_MSG | FI_RMA;
1317  hints->mode |= FI_RX_CQ_DATA;
1318  hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED |
1319  FI_MR_PROV_KEY | FI_MR_VIRT_ADDR;
1320  hints->domain_attr->cq_data_size = 4;
1321 
1322  for (idx = 0; idx < FAB_FABRIC_PROV_MAX; idx++) {
1323  hints->fabric_attr->prov_name = (char *)providers[idx];
1324  rc = fi_getinfo(LIBFAB_VERSION, addr, port, FI_SOURCE, hints,
1325  &fi);
1326  if (rc == 0)
1327  break;
1328  }
1329 
1330  if (rc != 0)
1331  return M0_ERR(rc);
1332 
1333  M0_ASSERT(idx < FAB_FABRIC_PROV_MAX);
1334 
1335  M0_LOG(M0_DEBUG, "tm=%s Provider selected %s",
1336  (char*)ep->fep_name.nia_p, fi->fabric_attr->prov_name);
1337  hints->fabric_attr->prov_name = NULL;
1338  tm->ftm_fab->fab_fi = fi;
1339  tm->ftm_fab->fab_prov = idx;
1340  tm->ftm_fab->fab_max_iov = min32u(fi->tx_attr->iov_limit,
1341  fi->tx_attr->rma_iov_limit);
1342  fi_freeinfo(hints);
1343 
1344  M0_ALLOC_ARR(tm->ftm_rem_iov, tm->ftm_fab->fab_max_iov);
1345  M0_ALLOC_ARR(tm->ftm_loc_iov, tm->ftm_fab->fab_max_iov);
1346  if (tm->ftm_rem_iov == NULL || tm->ftm_loc_iov == NULL) {
1347  m0_free(tm->ftm_rem_iov);
1348  m0_free(tm->ftm_loc_iov);
1349  return M0_ERR(-ENOMEM);
1350  }
1351 
1352  rc = fi_fabric(tm->ftm_fab->fab_fi->fabric_attr, &tm->ftm_fab->fab_fab,
1353  NULL) ? :
1354  libfab_waitfd_init(tm) ? :
1355  fi_passive_ep(tm->ftm_fab->fab_fab, tm->ftm_fab->fab_fi,
1356  &pep->pep_pep, NULL) ? :
1357  libfab_pep_res_init(pep, tm, ep) ? :
1358  fi_listen(pep->pep_pep) ? :
1359  fi_domain(tm->ftm_fab->fab_fab, tm->ftm_fab->fab_fi,
1360  &tm->ftm_fab->fab_dom, NULL);
1361 
1362  if (rc != 0) {
1363  libfab_pep_param_free(pep, tm);
1364  return M0_ERR(rc);
1365  }
1366 
1367  rx_size = tm->ftm_fab->fab_fi->rx_attr->size;
1368  tm->ftm_fab->fab_fi->rx_attr->size = FAB_MAX_SRX_SIZE;
1369  rc = fi_srx_context(tm->ftm_fab->fab_dom, tm->ftm_fab->fab_fi->rx_attr,
1370  &tm->ftm_rctx, NULL);
1371  tm->ftm_fab->fab_fi->rx_attr->size = rx_size;
1372  if (rc != 0) {
1373  M0_LOG(M0_ERROR," \n fi_srx_context = %d \n ", rc);
1374  libfab_pep_param_free(pep, tm);
1375  return M0_ERR(rc);
1376  }
1377 
1378  rc = libfab_tm_res_init(tm);
1379  if (rc != 0) {
1380  M0_LOG(M0_ERROR," \n libfab_tm_res_init = %d \n ", rc);
1381  libfab_pep_param_free(pep, tm);
1382  return M0_ERR(rc);
1383  }
1384 
1385  fab_sndbuf_tlist_init(&ep->fep_sndbuf);
1386  m0_ref_init(&tm->ftm_pep->fep_nep.nep_ref, 1, &libfab_ep_release);
1387  libfab_ep_get(tm->ftm_pep);
1388 
1389  return M0_RC(0);
1390 }
1391 
1395 static int libfab_pep_res_free(struct m0_fab__pep_res *pep_res,
1396  struct m0_fab__tm *tm)
1397 {
1398  int rc = 0;
1399 
1400  if (pep_res->fpr_eq != NULL) {
1401  rc = libfab_waitfd_unbind(&pep_res->fpr_eq->fid, tm,
1402  &pep_res->fpr_ctx);
1403  if (rc != 0)
1404  M0_LOG(M0_ERROR, "epoll_ctl_del failed %d", rc);
1405  rc = fi_close(&pep_res->fpr_eq->fid);
1406  if (rc != 0)
1407  M0_LOG(M0_ERROR, "fpr_eq fi_close ret=%d fid=%d",
1408  rc, (int)pep_res->fpr_eq->fid.fclass);
1409  pep_res->fpr_eq = NULL;
1410  }
1411 
1412  return M0_RC(rc);
1413 }
1414 
1418 static int libfab_ep_txres_free(struct m0_fab__tx_res *tx_res,
1419  struct m0_fab__tm *tm)
1420 {
1421  int rc = 0;
1422 
1423  if (tx_res->ftr_eq != NULL) {
1424  rc = libfab_waitfd_unbind(&tx_res->ftr_eq->fid, tm,
1425  &tx_res->ftr_ctx);
1426  if (rc != 0)
1427  M0_LOG(M0_ERROR, "epoll_ctl_del failed %d", rc);
1428  rc = fi_close(&tx_res->ftr_eq->fid);
1429  if (rc != 0)
1430  M0_LOG(M0_ERROR, "ftr_eq fi_close ret=%d fid=%d",
1431  rc, (int)tx_res->ftr_eq->fid.fclass);
1432  tx_res->ftr_eq = NULL;
1433  }
1434 
1435  return M0_RC(rc);
1436 }
1437 
1441 static int libfab_ep_rxres_free(struct m0_fab__rx_res *rx_res,
1442  struct m0_fab__tm *tm)
1443 {
1444  int rc = 0;
1445 
1446  if (rx_res->frr_eq != NULL) {
1447  rc = libfab_waitfd_unbind(&rx_res->frr_eq->fid, tm,
1448  &rx_res->frr_eq_ctx);
1449  if (rc != 0)
1450  M0_LOG(M0_ERROR,"epoll_ctl_del failed %d", rc);
1451  rc = fi_close(&rx_res->frr_eq->fid);
1452  if (rc != 0)
1453  M0_LOG(M0_ERROR, "frr_eq fi_close ret=%d fid=%d",
1454  rc, (int)rx_res->frr_eq->fid.fclass);
1455  rx_res->frr_eq = NULL;
1456  }
1457 
1458  if (rx_res->frr_cq != NULL) {
1459  rc = libfab_waitfd_unbind(&rx_res->frr_cq->fid, tm,
1460  &rx_res->frr_cq_ctx);
1461  if (rc != 0)
1462  M0_LOG(M0_ERROR, "epoll_ctl_del failed %d", rc);
1463  rc = fi_close(&rx_res->frr_cq->fid);
1464  if (rc != 0)
1465  M0_LOG(M0_ERROR, "frr_cq fi_close ret=%d fid=%d",
1466  rc, (int)rx_res->frr_cq->fid.fclass);
1467  rx_res->frr_cq = NULL;
1468  }
1469 
1470  return M0_RC(rc);
1471 }
1472 
1476 static int libfab_aep_param_free(struct m0_fab__active_ep *aep,
1477  struct m0_fab__tm *tm)
1478 {
1479  int rc = 0;
1480 
1481  if (aep == NULL)
1482  return M0_RC(0);
1483  if (aep->aep_txep != NULL) {
1484  rc = fi_close(&aep->aep_txep->fid);
1485  if (rc != 0)
1486  M0_LOG(M0_ERROR, "aep_txep fi_close ret=%d fid=%d",
1487  rc, (int)aep->aep_txep->fid.fclass);
1488  aep->aep_txep = NULL;
1489  }
1490 
1491  if (aep->aep_rxep != NULL) {
1492  rc = fi_close(&aep->aep_rxep->fid);
1493  if (rc != 0)
1494  M0_LOG(M0_ERROR, "aep_rxep fi_close ret=%d fid=%d",
1495  rc, (int)aep->aep_rxep->fid.fclass);
1496  aep->aep_rxep = NULL;
1497  }
1498 
1499  rc = libfab_ep_txres_free(&aep->aep_tx_res, tm);
1500  if (rc != 0)
1501  M0_LOG(M0_ERROR, "ep_txres_free failed %d", rc);
1502 
1503  rc = libfab_ep_rxres_free(&aep->aep_rx_res, tm);
1504  if (rc != 0)
1505  M0_LOG(M0_ERROR, "ep_rxres_free failed %d", rc);
1506 
1507  m0_free(aep);
1508 
1509  return M0_RC(rc);
1510 }
1511 
1515 static int libfab_pep_param_free(struct m0_fab__passive_ep *pep,
1516  struct m0_fab__tm *tm)
1517 {
1518  int rc = 0;
1519 
1520  if (pep == NULL)
1521  return M0_RC(0);
1522 
1523  if (pep->pep_pep != NULL) {
1524  rc = fi_close(&pep->pep_pep->fid);
1525  if (rc != 0)
1526  M0_LOG(M0_ERROR, "fep_pep fi_close ret=%d fid=%d",
1527  rc, (int)pep->pep_pep->fid.fclass);
1528  pep->pep_pep = NULL;
1529  }
1530 
1531  rc = libfab_aep_param_free(pep->pep_aep, tm);
1532  if (rc != 0)
1533  M0_LOG(M0_ERROR, "aep_param_free failed %d", rc);
1534 
1535  rc = libfab_pep_res_free(&pep->pep_res, tm);
1536  if (rc != 0)
1537  M0_LOG(M0_ERROR, "pep_res_free failed %d", rc);
1538 
1539  m0_free(pep);
1540 
1541  return M0_RC(rc);
1542 }
1543 
1547 static int libfab_ep_param_free(struct m0_fab__ep *ep, struct m0_fab__tm *tm)
1548 {
1549  int rc;
1550 
1551  if (ep == NULL)
1552  return M0_RC(0);
1553 
1554  rc = libfab_pep_param_free(ep->fep_listen, tm) ? :
1555  libfab_aep_param_free(ep->fep_aep, tm);
1556 
1557  if (rc != 0)
1558  return M0_ERR(rc);
1559  m0_free(ep);
1560  return M0_RC(0);
1561 }
1562 
1566 static int libfab_tm_param_free(struct m0_fab__tm *tm)
1567 {
1568  struct m0_fab__bulk_op *op;
1569  struct m0_net_end_point *net;
1570  struct m0_fab__ep *xep;
1571  struct m0_fab__buf *fbp;
1572  int rc;
1573 
1574  if (tm == NULL)
1575  return M0_RC(0);
1576 
1577  if (tm->ftm_poller.t_func != NULL) {
1578  m0_thread_join(&tm->ftm_poller);
1579  m0_thread_fini(&tm->ftm_poller);
1580  }
1581 
1582  M0_ASSERT(libfab_tm_is_locked(tm));
1583  m0_tl_teardown(m0_nep, &tm->ftm_ntm->ntm_end_points, net) {
1584  xep = libfab_ep(net);
1585  rc = libfab_ep_param_free(xep, tm);
1586  }
1587  M0_ASSERT(m0_nep_tlist_is_empty(&tm->ftm_ntm->ntm_end_points));
1588  tm->ftm_ntm->ntm_ep = NULL;
1589 
1590  if (tm->ftm_rctx != NULL) {
1591  rc = fi_close(&tm->ftm_rctx->fid);
1592  if (rc != 0)
1593  M0_LOG(M0_ERROR, "ftm_rctx fi_close ret=%d fid=%d",
1594  rc, (int)tm->ftm_rctx->fid.fclass);
1595  tm->ftm_rctx = NULL;
1596  }
1597 
1598  if (tm->ftm_tx_cq != NULL) {
1599  rc = libfab_waitfd_unbind(&tm->ftm_tx_cq->fid, tm,
1600  &tm->ftm_txcq_ctx);
1601  if (rc != 0)
1602  M0_LOG(M0_ERROR, "epoll_ctl_del failed %d", rc);
1603  rc = fi_close(&tm->ftm_tx_cq->fid);
1604  if (rc != 0)
1605  M0_LOG(M0_ERROR, "tx_cq fi_close ret=%d fid=%d",
1606  rc, (int)tm->ftm_tx_cq->fid.fclass);
1607  tm->ftm_tx_cq = NULL;
1608  }
1609 
1610  close(tm->ftm_epfd);
1611  m0_free(tm->ftm_fids.ftf_head);
1612  m0_free(tm->ftm_fids.ftf_ctx);
1613  m0_free(tm->ftm_rem_iov);
1614  m0_free(tm->ftm_loc_iov);
1615 
1616  m0_htable_for(fab_bufhash, fbp, &tm->ftm_bufhash.bht_hash) {
1617  fab_bufhash_htable_del(&tm->ftm_bufhash.bht_hash, fbp);
1618  } m0_htable_endfor;
1619  fab_bufhash_htable_fini(&tm->ftm_bufhash.bht_hash);
1620 
1621  m0_tl_teardown(fab_bulk, &tm->ftm_bulk, op) {
1622  m0_free(op);
1623  }
1624  fab_bulk_tlist_fini(&tm->ftm_bulk);
1625 
1626  return M0_RC(0);
1627 }
1628 
1633 static int libfab_waitfd_init(struct m0_fab__tm *tm)
1634 {
1635  M0_PRE(tm->ftm_epfd == -1);
1636 
1637  tm->ftm_epfd = epoll_create(1);
1638  if (tm->ftm_epfd < 0)
1639  return M0_ERR(-errno);
1640 
1641  return M0_RC(0);
1642 }
1643 
1647 static inline struct m0_fab__tm *libfab_buf_tm(struct m0_fab__buf *buf)
1648 {
1649  return buf->fb_nb->nb_tm->ntm_xprt_private;
1650 }
1651 
1655 static inline struct m0_fab__tm *libfab_buf_ma(struct m0_net_buffer *buf)
1656 {
1657  return buf->nb_tm != NULL ? buf->nb_tm->ntm_xprt_private : NULL;
1658 }
1659 
1663 static void libfab_buf_fini(struct m0_fab__buf *buf)
1664 {
1665  struct m0_fab__buf *fbp;
1666 
1667  M0_ENTRY("fb=%p q=%d rc=%d", buf, buf->fb_nb->nb_qtype, buf->fb_status);
1668 
1669  libfab_buf_invariant(buf);
1670 
1671  fab_buf_tlink_fini(buf);
1672  if (buf->fb_ev_ep != NULL)
1673  buf->fb_ev_ep = NULL;
1674  if (buf->fb_bulk_op != NULL && fab_bulk_tlink_is_in(buf->fb_bulk_op)) {
1675  fab_bulk_tlist_del(buf->fb_bulk_op);
1676  m0_free(buf->fb_bulk_op);
1677  buf->fb_bulk_op = NULL;
1678  }
1679 
1680  if (buf->fb_txctx != NULL) {
1681  fbp = m0_tl_find(fab_sndbuf, fbp, &buf->fb_txctx->fep_sndbuf,
1682  fbp == buf);
1683  if (fbp != NULL) {
1684  fab_sndbuf_tlist_del(fbp);
1685  M0_LOG(M0_DEBUG, "buf=%p tmout/del before queued", fbp);
1686  }
1687  buf->fb_txctx = NULL;
1688  }
1689  buf->fb_status = 0;
1690  buf->fb_length = 0;
1691  buf->fb_token = 0;
1692  /*
1693  * If the buffer operation has timedout or has been cancelled by
1694  * application, then the buffer has also been de-registered to prevent
1695  * data corruption due to any ongoing operations. In such cases, the
1696  * buffer state is reset to FAB_BUF_INITIALIZED so that it will be
1697  * re-registered when the application will try to re-use it.
1698  */
1699  buf->fb_state = (buf->fb_state == FAB_BUF_CANCELED ||
1700  buf->fb_state == FAB_BUF_TIMEDOUT) ?
1701  FAB_BUF_INITIALIZED : FAB_BUF_REGISTERED;
1702 
1703  M0_LEAVE("fb_state=%d", buf->fb_state);
1704 }
1705 
1709 static bool libfab_dom_invariant(const struct m0_net_domain *dom)
1710 {
1711  struct m0_fab__ndom *fnd = dom->nd_xprt_private;
1712  return _0C(!fab_fabs_tlist_is_empty(&fnd->fnd_fabrics)) &&
1713  _0C(dom->nd_xprt == &m0_net_libfab_xprt);
1714 }
1715 
1719 static bool libfab_tm_invariant(const struct m0_fab__tm *fab_tm)
1720 {
1721  return fab_tm != NULL &&
1722  fab_tm->ftm_ntm->ntm_xprt_private == fab_tm &&
1723  libfab_dom_invariant(fab_tm->ftm_ntm->ntm_dom);
1724 }
1725 
1729 static bool libfab_buf_invariant(const struct m0_fab__buf *buf)
1730 {
1731  const struct m0_net_buffer *nb = buf->fb_nb;
1732 
1733  return (nb->nb_flags == M0_NET_BUF_REGISTERED &&
1734  nb->nb_tm == NULL) ^ /* or (exclusively) ... */
1735  /* it is queued to a machine. */
1737  _0C(nb->nb_tm != NULL) &&
1739 }
1740 
1744 static void libfab_buf_complete(struct m0_fab__buf *buf)
1745 {
1746  struct m0_fab__tm *ma = libfab_buf_tm(buf);
1747  struct m0_net_buffer *nb = buf->fb_nb;
1748  struct m0_net_buffer_event ev = {
1749  .nbe_buffer = nb,
1750  .nbe_status = buf->fb_status,
1751  .nbe_time = m0_time_now()
1752  };
1753 
1754  M0_ENTRY("fb=%p nb=%p q=%d rc=%d", buf, nb, buf->fb_nb->nb_qtype,
1755  buf->fb_status);
1756  if (M0_IN(nb->nb_qtype, (M0_NET_QT_MSG_RECV,
1759  ev.nbe_length = buf->fb_length;
1760  }
1761 
1762  if (nb->nb_qtype == M0_NET_QT_MSG_RECV) {
1763  if (ev.nbe_status == 0 && buf->fb_ev_ep != NULL) {
1764  ev.nbe_ep = &buf->fb_ev_ep->fep_nep;
1765  libfab_ep_get(buf->fb_ev_ep);
1766  }
1767  }
1768  ma->ftm_ntm->ntm_callback_counter++;
1769 
1770  fab_bufhash_htable_del(&ma->ftm_bufhash.bht_hash, buf);
1771  libfab_buf_fini(buf);
1772  M0_ASSERT(libfab_tm_invariant(ma));
1773  libfab_tm_evpost_lock(ma);
1774  libfab_tm_unlock(ma);
1776  libfab_tm_lock(ma);
1777  libfab_tm_evpost_unlock(ma);
1778  M0_ASSERT(libfab_tm_invariant(ma));
1779  M0_ASSERT(M0_IN(ma->ftm_ntm->ntm_state, (M0_NET_TM_STARTED,
1780  M0_NET_TM_STOPPING)));
1781  ma->ftm_ntm->ntm_callback_counter--;
1782 }
1783 
1789 static int libfab_dummy_msg_rcv_chk(struct m0_fab__buf *fbp)
1790 {
1791  struct m0_fab__tm *ma = libfab_buf_tm(fbp);
1792  struct m0_net_buffer *nb = fbp->fb_nb;
1793  struct m0_fab__buf *pas_buf;
1794  struct iovec iv;
1795  uint32_t *ptr;
1796  uint32_t token;
1797  int ret = -1;
1798 
1799  if (fbp->fb_length == (sizeof(uint32_t) * 2)) {
1800  ptr = (uint32_t *)nb->nb_buffer.ov_buf[0];
1801  if (*ptr == FAB_DUMMY_DATA) {
1802  ptr++;
1803  token = *ptr;
1804  pas_buf = fab_bufhash_htable_lookup(
1805  &ma->ftm_bufhash.bht_hash,
1806  &token);
1807  if (pas_buf != NULL)
1808  libfab_buf_complete(pas_buf);
1809 
1810  /*
1811  * Repost this buffer to the receive
1812  * queue without generating a callback
1813  * as it contains only dummy data
1814  */
1815  fbp->fb_length = nb->nb_length;
1816  iv.iov_base = nb->nb_buffer.ov_buf[0];
1817  iv.iov_len = nb->nb_buffer.ov_vec.v_count[0];
1818  M0_ASSERT(fi_recvv(ma->ftm_rctx, &iv,
1819  fbp->fb_mr.bm_desc, 1, 0,
1820  U32_TO_VPTR(fbp->fb_token)) == 0);
1821  ret = 0;
1822  }
1823  }
1824 
1825  return ret;
1826 }
1827 
1831 static void libfab_buf_done(struct m0_fab__buf *buf, int rc, bool add_to_list)
1832 {
1833  struct m0_fab__tm *ma = libfab_buf_tm(buf);
1834  struct m0_net_buffer *nb = buf->fb_nb;
1835 
1836  M0_ENTRY("fb=%p nb=%p q=%d len=%d rc=%d", buf, nb, nb->nb_qtype,
1837  (int)buf->fb_length, rc);
1838  M0_PRE(libfab_tm_is_locked(ma));
1839  /*
1840  * Multiple libfab_buf_done() calls on the same buffer are possible if
1841  * the buffer is cancelled.
1842  */
1843  if (!fab_buf_tlink_is_in(buf)) {
1844  buf->fb_status = buf->fb_status == 0 ? rc : buf->fb_status;
1845  /* Try to finalise. */
1846  if (m0_thread_self() == &ma->ftm_poller && !add_to_list) {
1847  if (libfab_dummy_msg_rcv_chk(buf) != 0)
1848  libfab_buf_complete(buf);
1849  } else {
1850  /*
1851  * Otherwise, postpone finalisation to
1852  * libfab_tm_buf_done().
1853  */
1854  buf->fb_status = rc;
1855  fab_buf_tlist_add_tail(&ma->ftm_done, buf);
1856  }
1857  }
1858 }
1859 
1863 static inline void libfab_ep_get(struct m0_fab__ep *ep)
1864 {
1865  m0_ref_get(&ep->fep_nep.nep_ref);
1866 }
1867 
1874 static void libfab_ep_release(struct m0_ref *ref)
1875 {
1876  struct m0_net_end_point *nep;
1877  struct m0_fab__ep *ep;
1878  struct m0_fab__tm *tm;
1879 
1880  nep = container_of(ref, struct m0_net_end_point, nep_ref);
1881  ep = libfab_ep(nep);
1882  tm = nep->nep_tm->ntm_xprt_private;
1883  M0_LOG(M0_DEBUG, "free endpoint %s", (char*)ep->fep_name.nia_p);
1884 
1885  m0_nep_tlist_del(nep);
1886  libfab_ep_param_free(ep, tm);
1887 }
1888 
1892 static uint64_t libfab_mr_keygen(struct m0_fab__tm *tm)
1893 {
1894  uint64_t key = FAB_MR_KEY + tm->ftm_mr_key_idx;
1895  tm->ftm_mr_key_idx++;
1896  return key;
1897 }
1898 
1902 static int libfab_check_for_event(struct fid_eq *eq, uint32_t *ev)
1903 {
1904  struct fi_eq_cm_entry entry;
1905  struct fi_eq_err_entry err_entry;
1906  uint32_t event = 0;
1907  int rc;
1908 
1909  rc = fi_eq_read(eq, &event, &entry, sizeof(entry), 0);
1910  if (rc == -FI_EAVAIL) {
1911  memset(&err_entry, 0, sizeof(err_entry));
1912  fi_eq_readerr(eq, &err_entry, 0);
1913  rc = -err_entry.err;
1914  M0_LOG(M0_DEBUG, "Error = %d %s %s\n", rc,
1915  fi_strerror(err_entry.err),
1916  fi_eq_strerror(eq, err_entry.prov_errno,
1917  err_entry.err_data,NULL, 0));
1918  }
1919 
1920  *ev = rc < 0 ? 0xFF : event;
1921  return rc;
1922 }
1923 
1929 static int libfab_check_for_comp(struct fid_cq *cq, uint32_t *ctx,
1930  m0_bindex_t *len, uint64_t *data)
1931 {
1932  struct fi_cq_data_entry entry[FAB_MAX_COMP_READ];
1933  struct fi_cq_err_entry err_entry;
1934  uint64_t wr_cqdata = FI_REMOTE_WRITE | FI_REMOTE_CQ_DATA;
1935  int i;
1936  int ret;
1937 
1938  ret = fi_cq_read(cq, entry, FAB_MAX_COMP_READ);
1939  if (ret > 0) {
1940  for (i = 0; i < ret; i++) {
1941  ctx[i] = entry[i].op_context == NULL ? 0 :
1942  VPTR_TO_U32(entry[i].op_context);
1943  if (len != NULL)
1944  len[i] = entry[i].len;
1945  if (data != NULL)
1946  data[i] = ((entry[i].flags & wr_cqdata)) ?
1947  entry[i].data : 0;
1948  }
1949  } else if (ret != -FI_EAGAIN) {
1950  memset(&err_entry, 0, sizeof(err_entry));
1951  fi_cq_readerr(cq, &err_entry, 0);
1952  M0_LOG(M0_DEBUG, "Error = %d %s %s\n", ret,
1953  fi_strerror(err_entry.err),
1954  fi_cq_strerror(cq, err_entry.prov_errno,
1955  err_entry.err_data, NULL, 0));
1956  }
1957 
1958  return ret;
1959 }
1960 
1967 static void libfab_tm_fini(struct m0_net_transfer_mc *tm)
1968 {
1969  struct m0_fab__tm *ma = tm->ntm_xprt_private;
1970  int rc;
1971 
1972  if (ma->ftm_state != FAB_TM_SHUTDOWN) {
1973  while (1) {
1974  libfab_tm_lock(ma);
1975  if (m0_mutex_trylock(&ma->ftm_evpost) != 0) {
1976  libfab_tm_unlock(ma);
1977  } else
1978  break;
1979  }
1980  m0_mutex_unlock(&ma->ftm_evpost);
1981  m0_mutex_lock(&ma->ftm_endlock);
1982  ma->ftm_state = FAB_TM_SHUTDOWN;
1983  m0_mutex_unlock(&ma->ftm_endlock);
1984 
1985  libfab_tm_buf_done(ma);
1986 
1987  rc = libfab_tm_param_free(ma);
1988  if (rc != 0)
1989  M0_LOG(M0_ERROR, "libfab_tm_param_free ret=%d", rc);
1990 
1991  m0_mutex_fini(&ma->ftm_endlock);
1992  m0_mutex_fini(&ma->ftm_evpost);
1993  libfab_tm_unlock(ma);
1994  }
1995 
1996  M0_LEAVE();
1997 }
1998 
2002 static int libfab_bdesc_encode(struct m0_fab__buf *buf)
2003 {
2004  struct m0_fab__bdesc *fbd;
2005  struct fi_rma_iov *iov;
2006  struct m0_net_buf_desc *nbd = &buf->fb_nb->nb_desc;
2007  struct m0_net_buffer *nb = buf->fb_nb;
2008  struct m0_fab__tm *tm = libfab_buf_ma(nb);
2009  int seg_nr = nb->nb_buffer.ov_vec.v_nr;
2010  struct m0_fab__ndom *nd = nb->nb_dom->nd_xprt_private;
2011  int i;
2012  bool is_verbs = libfab_is_verbs(tm);
2013 
2014  M0_PRE(seg_nr <= nd->fnd_seg_nr);
2015 
2016  nbd->nbd_len = (sizeof(struct m0_fab__bdesc) +
2017  (sizeof(struct fi_rma_iov) * seg_nr));
2018  nbd->nbd_data = m0_alloc(nbd->nbd_len);
2019  if (nbd->nbd_data == NULL)
2020  return M0_RC(-ENOMEM);
2021 
2022  fbd = (struct m0_fab__bdesc *)nbd->nbd_data;
2023  fbd->fbd_netaddr = tm->ftm_pep->fep_name.nia_n;
2024  fbd->fbd_buftoken = buf->fb_token;
2025 
2026  fbd->fbd_iov_cnt = (uint32_t)seg_nr;
2027  iov = (struct fi_rma_iov *)(nbd->nbd_data +
2028  sizeof(struct m0_fab__bdesc));
2029 
2030  for (i = 0; i < seg_nr; i++) {
2031  iov[i].addr = is_verbs ? (uint64_t)nb->nb_buffer.ov_buf[i] : 0;
2032  iov[i].key = fi_mr_key(buf->fb_mr.bm_mr[i]);
2033  iov[i].len = nb->nb_buffer.ov_vec.v_count[i];
2034  }
2035 
2036  return M0_RC(0);
2037 }
2038 
2042 static void libfab_bdesc_decode(struct m0_fab__buf *fb,
2043  struct m0_net_ip_params *addr)
2044 {
2045  struct m0_net_buffer *nb = fb->fb_nb;
2046  struct m0_fab__ndom *ndom = nb->nb_dom->nd_xprt_private;
2047 
2048  fb->fb_rbd = (struct m0_fab__bdesc *)(nb->nb_desc.nbd_data);
2049  fb->fb_riov = (struct fi_rma_iov *)(nb->nb_desc.nbd_data +
2050  sizeof(struct m0_fab__bdesc));
2051  *addr = fb->fb_rbd->fbd_netaddr;
2052  M0_ASSERT(fb->fb_rbd->fbd_iov_cnt <= ndom->fnd_seg_nr);
2053 }
2054 
2058 static int libfab_buf_dom_reg(struct m0_net_buffer *nb, struct m0_fab__tm *tm)
2059 {
2060  struct m0_fab__buf *fbp;
2061  struct m0_fab__buf_mr *mr;
2062  struct m0_fab__ndom *ndom;
2063  struct fid_domain *dp;
2064  uint64_t key;
2065  uint32_t retry_cnt;
2066  int seg_nr;
2067  int i;
2068  int ret = 0;
2069 
2070  M0_PRE(nb != NULL && nb->nb_dom != NULL && tm != NULL);
2071  fbp = nb->nb_xprt_private;
2072  seg_nr = nb->nb_buffer.ov_vec.v_nr;
2073  ndom = nb->nb_dom->nd_xprt_private;
2074  dp = tm->ftm_fab->fab_dom;
2075 
2076  M0_ASSERT(fbp != NULL && dp != NULL && ndom != NULL);
2077  M0_ASSERT(seg_nr <= ndom->fnd_seg_nr);
2078 
2079  mr = &fbp->fb_mr;
2080  if (fbp->fb_dp == dp)
2081  return M0_RC(ret);
2082 
2083  if (fbp->fb_state == FAB_BUF_REGISTERED)
2084  M0_LOG(M0_ERROR,"Re-registration of buffer");
2085 
2086  for (i = 0; i < seg_nr; i++) {
2087  /*
2088  * Sometimes the requested key is not available and
2089  * hence try with some other key for registration
2090  */
2091  ret = -1;
2092  retry_cnt = 20;
2093 
2094  while (ret != 0 && retry_cnt > 0) {
2095  key = libfab_mr_keygen(tm);
2096  ret = fi_mr_reg(dp, nb->nb_buffer.ov_buf[i],
2097  nb->nb_buffer.ov_vec.v_count[i],
2098  FAB_MR_ACCESS, FAB_MR_OFFSET, key,
2099  FAB_MR_FLAG, &mr->bm_mr[i], NULL);
2100  --retry_cnt;
2101  }
2102 
2103  if (ret != 0) {
2104  M0_LOG(M0_ERROR, "fi_mr_reg failed %d key=0x%"PRIx64,
2105  ret, key);
2106  break;
2107  }
2108 
2109  mr->bm_desc[i] = fi_mr_desc(mr->bm_mr[i]);
2110  }
2111 
2112  if (ret == 0) {
2113  fbp->fb_dp = dp;
2114  fbp->fb_state = FAB_BUF_REGISTERED;
2115  }
2116 
2117  return M0_RC(ret);
2118 }
2119 
2124 static void libfab_pending_bufs_send(struct m0_fab__ep *ep)
2125 {
2126  struct m0_fab__active_ep *aep;
2127  struct m0_fab__buf *fbp;
2128  struct m0_net_buffer *nb = NULL;
2129  int ret = 0;
2130 
2131  aep = libfab_aep_get(ep);
2132  m0_tl_teardown(fab_sndbuf, &ep->fep_sndbuf, fbp) {
2133  nb = fbp->fb_nb;
2134  fbp->fb_txctx = ep;
2135  switch (nb->nb_qtype) {
2136  case M0_NET_QT_MSG_SEND:
2139  ret = libfab_txbuf_list_add(libfab_buf_ma(nb),
2140  fbp, aep);
2141  break;
2142  default:
2143  M0_ASSERT(0); /* Invalid queue type */
2144  break;
2145  }
2146  if (ret != 0)
2147  libfab_buf_done(fbp, ret, false);
2148  }
2149 
2150  if (nb != NULL)
2151  libfab_bufq_process(libfab_buf_ma(nb));
2152 }
2153 
2158 static int libfab_target_notify(struct m0_fab__buf *buf)
2159 {
2160  struct m0_fab__active_ep *aep;
2161  struct m0_fab__buf *fbp;
2162  struct m0_fab__tm *tm;
2163  struct iovec iv;
2164  struct fi_msg op_msg;
2165  int ret = 0;
2166 
2167  M0_PRE(buf != NULL && buf->fb_txctx != NULL);
2168  aep = libfab_aep_get(buf->fb_txctx);
2169  M0_ASSERT(aep != NULL);
2170 
2171  if (buf->fb_nb->nb_qtype == M0_NET_QT_ACTIVE_BULK_RECV &&
2172  aep->aep_tx_state == FAB_CONNECTED) {
2173  M0_ALLOC_PTR(fbp);
2174  if (fbp == NULL)
2175  return M0_ERR(-ENOMEM);
2176 
2177  fbp->fb_nb = NULL;
2178  fbp->fb_dummy[0] = FAB_DUMMY_DATA;
2179  fbp->fb_dummy[1] = buf->fb_rbd->fbd_buftoken;
2180  fbp->fb_txctx = buf->fb_txctx;
2181  tm = libfab_buf_tm(buf);
2182  fbp->fb_token = libfab_buf_token_get(tm, fbp);
2183  aep->aep_bulk_cnt++;
2184  m0_tlink_init(&fab_bufhash_tl, fbp);
2185  fab_bufhash_htable_add(&tm->ftm_bufhash.bht_hash, fbp);
2186 
2187  iv.iov_base = fbp->fb_dummy;
2188  iv.iov_len = sizeof(fbp->fb_dummy);
2189  op_msg.msg_iov = &iv;
2190  op_msg.desc = NULL;
2191  op_msg.iov_count = 1;
2192  op_msg.addr = 0;
2193  op_msg.context = U32_TO_VPTR(fbp->fb_token);
2194  op_msg.data = 0;
2195  fbp->fb_wr_cnt = 1;
2196  ret = fi_sendmsg(aep->aep_txep, &op_msg, FI_COMPLETION);
2197  if (ret != 0) {
2198  M0_LOG(M0_ERROR,"tgt notify fail %d opcnt=%d", ret,
2199  aep->aep_bulk_cnt);
2200  fab_bufhash_htable_del(&tm->ftm_bufhash.bht_hash, fbp);
2201  --aep->aep_bulk_cnt;
2202  m0_free(fbp);
2203  }
2204  }
2205 
2206  return M0_RC(ret);
2207 }
2208 
2212 static struct m0_fab__fab *libfab_newfab_init(struct m0_fab__ndom *fnd)
2213 {
2214  struct m0_fab__fab *fab = NULL;
2215 
2216  M0_ALLOC_PTR(fab);
2217  if (fab != NULL)
2218  fab_fabs_tlink_init_at_tail(fab, &fnd->fnd_fabrics);
2219  return fab;
2220 }
2221 
2226 static int libfab_dns_resolve_retry(struct m0_fab__ep *ep)
2227 {
2228  struct m0_net_ip_addr *en = &ep->fep_name;
2229  int rc = 0;
2230  enum m0_net_ip_format not_used;
2231  char *fqdn = en->nia_p;
2232  char ip[LIBFAB_ADDR_LEN_MAX] = {};
2233 
2234  /* Verify if ip addr is resolved and ip is valid */
2236  fqdn = strchr(fqdn, ':'); /* Skip '<inet/inet6>:' */
2237  fqdn = strchr(fqdn + 1, ':'); /* Skip '<tcp/verbs>:' */
2238  fqdn++;
2239 
2240  rc = m0_net_hostname_to_ip(fqdn, ip, &not_used);
2241  if (rc == 0) {
2242  inet_pton(en->nia_n.nip_fmt_pvt.ia.nia_family ==
2243  M0_NET_IP_AF_INET ? AF_INET : AF_INET6,
2244  ip, &en->nia_n.nip_ip_n.sn[0]);
2245  libfab_ep_pton(en, &ep->fep_name_n);
2246  M0_LOG(M0_DEBUG, "ip=%s port=%d fqdn=%s", (char *)ip,
2247  (int)en->nia_n.nip_port, (char *)fqdn);
2248  } else
2249  M0_LOG(M0_ERROR, "%s failed with err %d for %s",
2250  rc > 0 ? "gethostbyname()" : "hostname_to_ip()",
2251  rc, fqdn);
2252  }
2253 
2254  return M0_RC(rc);
2255 }
2256 
2261 static int libfab_conn_init(struct m0_fab__ep *ep, struct m0_fab__tm *ma,
2262  struct m0_fab__buf *fbp)
2263 {
2264  struct m0_fab__active_ep *aep;
2265  uint64_t dst;
2266  size_t cm_max_size = 0;
2267  size_t opt_size = sizeof(size_t);
2268  struct m0_fab__conn_data cd;
2269  int ret = 0;
2270 
2271  aep = libfab_aep_get(ep);
2272  if (aep->aep_tx_state == FAB_NOT_CONNECTED) {
2273  /*
2274  * Verify if destination addr is resolved and ip is valid.
2275  * If not resolved, try to resolve again.
2276  */
2277  libfab_dns_resolve_retry(ep);
2278  dst = ep->fep_name_n | 0x02;
2279  cd.fcd_addr = ma->ftm_pep->fep_name.nia_n;
2280 
2281  ret = fi_getopt(&aep->aep_txep->fid, FI_OPT_ENDPOINT,
2282  FI_OPT_CM_DATA_SIZE,
2283  &cm_max_size, &opt_size);
2284  M0_ASSERT(ret == 0 && sizeof(cd) < cm_max_size);
2285 
2286  ret = fi_connect(aep->aep_txep, &dst, &cd, sizeof(cd));
2287  if (ret == 0) {
2288  aep->aep_tx_state = FAB_CONNECTING;
2289  aep->aep_connecting_tmout = m0_time_from_now(
2290  FAB_CONNECTING_TMOUT, 0);
2291  } else
2292  M0_LOG(M0_DEBUG, "Conn req failed ret=%d dst=%"PRIx64,
2293  ret, dst);
2294  }
2295 
2296  if (ret == 0)
2297  fab_sndbuf_tlink_init_at_tail(fbp, &ep->fep_sndbuf);
2298 
2299  /*
2300  * If fi_connect immediately returns -ECONNREFUSED, that means the
2301  * the remote service has not yet started. In this case, set the buffer
2302  * status as -ECONNREFUSED and return the status as 0 so as to avoid
2303  * flooding the network with repeated retries by the RPC layer. The
2304  * buffer status will be automatically returned when the buf_done list
2305  * is processed.
2306  */
2307  if (ret == -ECONNREFUSED) {
2308  libfab_buf_done(fbp, -ECONNREFUSED, true);
2309  ret = 0;
2310  M0_LOG(M0_DEBUG, "Err=%d fb=%p nb=%p", fbp->fb_status, fbp,
2311  fbp->fb_nb);
2312  }
2313 
2314  return ret;
2315 }
2316 
2320 static int libfab_fab_ep_find(struct m0_fab__tm *tm, const char *name,
2321  struct m0_net_ip_params *addr,
2322  struct m0_fab__ep **ep)
2323 {
2324  struct m0_net_transfer_mc *ntm = tm->ftm_ntm;
2325  struct m0_net_end_point *net;
2326  int ret;
2327 
2328  ret = libfab_ep_find(ntm, name, addr, &net);
2329  if (ret == 0)
2330  *ep = libfab_ep(net);
2331 
2332  return M0_RC(ret);
2333 }
2334 
2338 static void libfab_ep_pton(struct m0_net_ip_addr *name, uint64_t *out)
2339 {
2340  uint32_t addr = name->nia_n.nip_ip_n.sn[0];
2341  uint32_t port = name->nia_n.nip_port;
2342 
2343  M0_ASSERT(port < 65536);
2344  port = htonl(port);
2345 
2346  *out = ((uint64_t)addr << 32) | port;
2347 }
2348 
2356 static int libfab_txep_init(struct m0_fab__active_ep *aep,
2357  struct m0_fab__tm *tm, void *ctx)
2358 {
2359  struct m0_fab__ep *ep = (struct m0_fab__ep *)ctx;
2360  struct m0_net_ip_addr *en = &ep->fep_name;
2361  struct m0_fab__fab *fab = tm->ftm_fab;
2362  struct fi_info *info;
2363  struct fi_info *hints = NULL;
2364  int rc;
2365  char ip[LIBFAB_ADDR_LEN_MAX] = {};
2366  char port[LIBFAB_PORT_LEN_MAX] = {};
2367 
2368  if (aep->aep_txep != NULL) {
2369  rc = fi_close(&aep->aep_txep->fid);
2370  if (rc != 0)
2371  M0_LOG(M0_ERROR,"aep_txep close failed %d",rc);
2372 
2373  rc = libfab_ep_txres_free(&aep->aep_tx_res, tm);
2374  if (rc != 0)
2375  M0_LOG(M0_ERROR,"ep_txres_free failed %d",rc);
2376  }
2377  aep->aep_tx_state = FAB_NOT_CONNECTED;
2378  aep->aep_txq_full = false;
2379  ep->fep_connlink = FAB_CONNLINK_DOWN;
2380 
2381  hints = fi_allocinfo();
2382  if (hints == NULL)
2383  return M0_ERR(-ENOMEM);
2384  hints->ep_attr->type = FI_EP_MSG;
2385  hints->caps = FI_MSG | FI_RMA;
2386 
2387  hints->mode |= FI_RX_CQ_DATA;
2388  hints->domain_attr->cq_data_size = 4;
2389  hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED |
2390  FI_MR_PROV_KEY | FI_MR_VIRT_ADDR;
2391  hints->fabric_attr->prov_name = fab->fab_fi->fabric_attr->prov_name;
2392 
2393  libfab_straddr_gen(&en->nia_n, ip);
2394  snprintf(port, ARRAY_SIZE(port), "%d", en->nia_n.nip_port);
2395  rc = fi_getinfo(LIBFAB_VERSION, ip, port, 0, hints, &info);
2396  if (rc != 0) {
2397  hints->fabric_attr->prov_name = NULL;
2398  fi_freeinfo(hints);
2399  return M0_ERR(rc);
2400  }
2401 
2402  rc = fi_endpoint(fab->fab_dom, info, &aep->aep_txep, NULL) ? :
2403  libfab_ep_txres_init(aep, tm, ctx) ? :
2404  fi_enable(aep->aep_txep);
2405 
2406  hints->fabric_attr->prov_name = NULL;
2407  fi_freeinfo(hints);
2408  fi_freeinfo(info);
2409 
2410  return M0_RC(rc);
2411 }
2412 
2418 static int libfab_fid_array_grow(struct m0_fab__tm_fids *tmfid, uint32_t incr)
2419 {
2420  struct m0_fab__ev_ctx **old_ctx = tmfid->ftf_ctx;
2421  struct m0_fab__ev_ctx **new_ctx = NULL;
2422  struct fid **old_fid = tmfid->ftf_head;
2423  struct fid **new_fid = NULL;
2424  uint32_t old_size = tmfid->ftf_arr_size;
2425  uint32_t new_size = old_size + incr;
2426  int i;
2427 
2428  M0_PRE(old_ctx != NULL && old_fid != NULL && old_size < new_size);
2429 
2430  M0_ALLOC_ARR(new_ctx, new_size);
2431  M0_ALLOC_ARR(new_fid, new_size);
2432  if (new_ctx == NULL || new_fid == NULL) {
2433  m0_free(new_ctx);
2434  m0_free(new_fid);
2435  return M0_ERR(-ENOMEM);
2436  }
2437 
2438  /* Copy fids from old array to new array. */
2439  for (i = 0; i < old_size; i++) {
2440  new_ctx[i] = old_ctx[i];
2441  new_fid[i] = old_fid[i];
2442  }
2443  tmfid->ftf_ctx = new_ctx;
2444  tmfid->ftf_head = new_fid;
2445  tmfid->ftf_arr_size = new_size;
2446 
2447  M0_LOG(M0_DEBUG,"old={fid=%p ctx=%p size=%d} new={fid=%p ctx=%p size=%d}",
2448  old_fid, old_ctx, old_size, new_fid, new_ctx, new_size);
2449 
2450  /* Free old array */
2451  m0_free(old_ctx);
2452  m0_free(old_fid);
2453 
2454  return M0_RC(0);
2455 }
2456 
2460 static int libfab_waitfd_bind(struct fid* fid, struct m0_fab__tm *tm, void *ctx)
2461 {
2462  struct m0_fab__tm_fids *tmfid = &tm->ftm_fids;
2463  struct m0_fab__ev_ctx *ptr = ctx;
2464  struct epoll_event ev;
2465  int fd;
2466  int rc;
2467 
2468  rc = fi_control(fid, FI_GETWAIT, &fd);
2469  if (rc != 0)
2470  return M0_ERR(rc);
2471 
2472  ev.events = EPOLLIN;
2473  ev.data.ptr = ptr;
2474  M0_LOG(M0_DEBUG, "ADD_TO_EPOLL %s=%p fd=%d ctx=%p tm=%p pos=%d",
2475  ptr->evctx_dbg, fid, fd, ctx, tm, (int)tmfid->ftf_cnt);
2476  rc = epoll_ctl(tm->ftm_epfd, EPOLL_CTL_ADD, fd, &ev);
2477 
2478  if (rc == 0) {
2479  if (tmfid->ftf_cnt >= (tmfid->ftf_arr_size - 1)) {
2480  rc = libfab_fid_array_grow(tmfid,
2481  FAB_TM_FID_MALLOC_STEP);
2482  if (rc != 0)
2483  return M0_ERR(rc);
2484  }
2485  tmfid->ftf_head[tmfid->ftf_cnt] = fid;
2486  tmfid->ftf_ctx[tmfid->ftf_cnt] = ptr;
2487  ptr->evctx_pos = tmfid->ftf_cnt;
2488  tmfid->ftf_cnt++;
2489  M0_ASSERT(tmfid->ftf_cnt < tmfid->ftf_arr_size);
2490  }
2491 
2492  return M0_RC(rc);
2493 }
2494 
2499 static int libfab_waitfd_unbind(struct fid* fid, struct m0_fab__tm *tm,
2500  void *ctx)
2501 {
2502  struct m0_fab__tm_fids *tmfid = &tm->ftm_fids;
2503  struct m0_fab__ev_ctx *ptr = ctx;
2504  struct epoll_event ev = {};
2505  int fd;
2506  int rc;
2507  int i;
2508 
2509  rc = fi_control(fid, FI_GETWAIT, &fd);
2510  if (rc != 0)
2511  return M0_ERR(rc);
2512 
2513  rc = epoll_ctl(tm->ftm_epfd, EPOLL_CTL_DEL, fd, &ev);
2514  if (rc == 0) {
2515  M0_LOG(M0_DEBUG, "DEL_FROM_EPOLL %s fid=%p fd=%d tm=%p pos=%d",
2516  ptr->evctx_dbg, fid, fd, tm, ptr->evctx_pos);
2517  for (i = ptr->evctx_pos; i < tmfid->ftf_cnt - 1; i++) {
2518  tmfid->ftf_head[i] = tmfid->ftf_head[i + 1];
2519  tmfid->ftf_ctx[i] = tmfid->ftf_ctx[i + 1];
2520  tmfid->ftf_ctx[i]->evctx_pos--;
2521  }
2522  --tmfid->ftf_cnt;
2523  tmfid->ftf_head[tmfid->ftf_cnt] = 0;
2524  tmfid->ftf_ctx[tmfid->ftf_cnt] = 0;
2525  ptr->evctx_pos = 0;
2526  }
2527 
2528  return M0_RC(rc);
2529 }
2530 
2534 static inline struct m0_fab__active_ep *libfab_aep_get(struct m0_fab__ep *ep)
2535 {
2536  return ep->fep_listen == NULL ? ep->fep_aep : ep->fep_listen->pep_aep;
2537 }
2538 
2542 static inline bool libfab_is_verbs(struct m0_fab__tm *tm)
2543 {
2544  return tm->ftm_fab->fab_prov == FAB_FABRIC_PROV_VERBS;
2545 }
2546 
2550 static int libfab_txbuf_list_add(struct m0_fab__tm *tm, struct m0_fab__buf *fb,
2551  struct m0_fab__active_ep *aep)
2552 {
2553  struct m0_fab__bulk_op *op;
2554 
2555  M0_ALLOC_PTR(op);
2556  if (op == NULL)
2557  return M0_ERR(-ENOMEM);
2558  op->fbl_aep = aep;
2559  op->fbl_buf = fb;
2560  fb->fb_bulk_op = op;
2561  fb->fb_wr_cnt = 0;
2562  M0_SET0(&fb->fb_xfer_params);
2563  fab_bulk_tlink_init_at_tail(op, &tm->ftm_bulk);
2564 
2565  return M0_RC(0);
2566 }
2567 
2571 static void libfab_bufq_process(struct m0_fab__tm *tm)
2572 {
2573  struct m0_fab__bulk_op *op;
2574  int ret;
2575 
2576  m0_tl_for(fab_bulk, &tm->ftm_bulk, op) {
2577  /*
2578  * Only post the bulk buffer if the endpoint is in
2579  * connected state.
2580  */
2581  if (op->fbl_aep->aep_tx_state == FAB_CONNECTED &&
2582  !op->fbl_aep->aep_txq_full) {
2583  if (op->fbl_buf->fb_nb->nb_qtype == M0_NET_QT_MSG_SEND)
2584  ret = libfab_ping_op(op->fbl_aep, op->fbl_buf);
2585  else
2586  ret = libfab_bulk_op(op->fbl_aep, op->fbl_buf);
2587 
2588  if (ret == 0) {
2589  fab_bulk_tlist_del(op);
2590  op->fbl_buf->fb_bulk_op = NULL;
2591  m0_free(op);
2592  } else
2593  op->fbl_aep->aep_txq_full = true;
2594  }
2595  } m0_tl_endfor;
2596 
2597 
2598 }
2599 
2604 static int libfab_ping_op(struct m0_fab__active_ep *aep, struct m0_fab__buf *fb)
2605 {
2606  struct fi_msg op_msg;
2607  struct iovec iv;
2608  int ret;
2609 
2610  iv.iov_base = fb->fb_nb->nb_buffer.ov_buf[0];
2611  iv.iov_len = fb->fb_nb->nb_buffer.ov_vec.v_count[0];
2612  op_msg.msg_iov = &iv;
2613  op_msg.desc = fb->fb_mr.bm_desc;
2614  op_msg.iov_count = 1;
2615  op_msg.addr = 0;
2616  op_msg.context = U32_TO_VPTR(fb->fb_token);
2617  op_msg.data = 0;
2618  fb->fb_wr_cnt = 1;
2619  ret = fi_sendmsg(aep->aep_txep, &op_msg, FI_COMPLETION);
2620  if (ret == 0)
2621  aep->aep_bulk_cnt += fb->fb_wr_cnt;
2622 
2623  return ret;
2624 }
2625 
2630 static int libfab_bulk_op(struct m0_fab__active_ep *aep, struct m0_fab__buf *fb)
2631 {
2632  struct m0_fab__buf_xfer_params xp;
2633  struct m0_fab__tm *tm = libfab_buf_tm(fb);
2634  struct fi_msg_rma op_msg;
2635  struct fi_rma_iov *r_iov;
2636  struct fi_rma_iov *remote = tm->ftm_rem_iov;
2637  struct iovec *loc_iv = tm->ftm_loc_iov;
2638  m0_bcount_t *v_cnt;
2639  uint64_t op_flag;
2640  uint32_t loc_slen;
2641  uint32_t rem_slen;
2642  uint32_t wr_cnt = 0;
2643  uint32_t max_iov = tm->ftm_fab->fab_max_iov;
2644  uint32_t idx;
2645  int ret = 0;
2646  bool isread;
2647  bool last_seg = false;
2648 
2649  M0_ENTRY("loc_buf=%p q=%d loc_seg=%d rem_buf=%d rem_seg=%d iov_max=%d",
2650  fb, fb->fb_nb->nb_qtype, fb->fb_nb->nb_buffer.ov_vec.v_nr,
2651  (int)fb->fb_rbd->fbd_buftoken, (int)fb->fb_rbd->fbd_iov_cnt,
2652  (int)max_iov);
2653  M0_PRE(fb->fb_rbd != NULL);
2654  M0_PRE(remote != NULL && loc_iv != NULL);
2655 
2656  v_cnt = fb->fb_nb->nb_buffer.ov_vec.v_count;
2657  /* Pick last succesfully transfered bulk buf params */
2658  xp = fb->fb_xfer_params;
2659  r_iov = fb->fb_riov;
2660  isread = (fb->fb_nb->nb_qtype == M0_NET_QT_ACTIVE_BULK_RECV);
2661 
2662  while (xp.bxp_xfer_len < fb->fb_nb->nb_length) {
2663  for (idx = 0; idx < max_iov && !last_seg; idx++) {
2664  M0_ASSERT(xp.bxp_rem_sidx <= fb->fb_rbd->fbd_iov_cnt);
2665  loc_slen = v_cnt[xp.bxp_loc_sidx] - xp.bxp_loc_soff;
2666  rem_slen = r_iov[xp.bxp_rem_sidx].len - xp.bxp_rem_soff;
2667 
2668  loc_iv[idx].iov_base = fb->fb_nb->nb_buffer.ov_buf[
2669  xp.bxp_loc_sidx] +
2670  xp.bxp_loc_soff;
2671  loc_iv[idx].iov_len = min64u(loc_slen, rem_slen);
2672  remote[idx] = r_iov[xp.bxp_rem_sidx];
2673  remote[idx].addr += xp.bxp_rem_soff;
2674  remote[idx].len -= xp.bxp_rem_soff;
2675 
2676  if (loc_slen > rem_slen) {
2677  xp.bxp_rem_sidx++;
2678  xp.bxp_rem_soff = 0;
2679  xp.bxp_loc_soff += loc_iv[idx].iov_len;
2680  } else {
2681  xp.bxp_loc_sidx++;
2682  xp.bxp_loc_soff = 0;
2683  xp.bxp_rem_soff += loc_iv[idx].iov_len;
2684  if (xp.bxp_rem_soff >=
2685  r_iov[xp.bxp_rem_sidx].len) {
2686  xp.bxp_rem_sidx++;
2687  xp.bxp_rem_soff = 0;
2688  }
2689  }
2690 
2691  xp.bxp_xfer_len += loc_iv[idx].iov_len;
2692  if (xp.bxp_xfer_len >= fb->fb_nb->nb_length)
2693  last_seg = true;
2694  }
2695 
2696  op_msg.msg_iov = &loc_iv[0];
2697  op_msg.desc = &fb->fb_mr.bm_desc[xp.bxp_loc_sidx];
2698  op_msg.iov_count = idx;
2699  op_msg.addr = xp.bxp_rem_soff;
2700  op_msg.rma_iov = &remote[0];
2701  op_msg.rma_iov_count = idx;
2702  op_msg.context = U32_TO_VPTR(fb->fb_token);
2703 
2704  op_msg.data = (isread || (!last_seg)) ? 0 :
2705  fb->fb_rbd->fbd_buftoken;
2706  op_flag = (isread || (!last_seg)) ? 0 : FI_REMOTE_CQ_DATA;
2707  op_flag |= last_seg ? FI_COMPLETION : 0;
2708 
2709  ret = isread ? fi_readmsg(aep->aep_txep, &op_msg, op_flag) :
2710  fi_writemsg(aep->aep_txep, &op_msg, op_flag);
2711 
2712  if (ret != 0) {
2713  M0_LOG(M0_ERROR,"bulk-op failed %d b=%p q=%d l_seg=%d \
2714  opcnt=%d", ret, fb, fb->fb_nb->nb_qtype,
2715  xp.bxp_loc_sidx, aep->aep_bulk_cnt);
2716  break;
2717  } else {
2718  wr_cnt++;
2719  aep->aep_bulk_cnt++;
2720  /* Save last succesfully transfered bulk buf params */
2721  fb->fb_xfer_params = xp;
2722  }
2723  }
2724  fb->fb_wr_cnt += wr_cnt;
2725  return M0_RC(ret);
2726 }
2727 
2731 static uint32_t libfab_buf_token_get(struct m0_fab__tm *tm,
2732  struct m0_fab__buf *fb)
2733 {
2734  union m0_fab__token token;
2735 
2736  token.t_val = 0;
2737  token.t_Fields.tf_queue_id = (fb->fb_nb == NULL) ? M0_NET_QT_NR :
2738  fb->fb_nb->nb_qtype;
2739  ++tm->ftm_op_id;
2740  if (tm->ftm_op_id == 0)
2741  ++tm->ftm_op_id; /* 0 is treated as an invalid value for token*/
2742  /* Queue selection round robin for a queue type */
2743  ++tm->ftm_rr_qt[token.t_Fields.tf_queue_id];
2744 
2745  token.t_Fields.tf_queue_num = (tm->ftm_rr_qt[token.t_Fields.tf_queue_id]
2746  % FAB_NUM_BUCKETS_PER_QTYPE);
2747  token.t_Fields.tf_tag = tm->ftm_op_id;
2748 
2749  return token.t_val;
2750 }
2751 
2752 static int libfab_domain_params_get(struct m0_fab__ndom *fab_ndom)
2753 {
2754  struct fi_info *hints;
2755  struct fi_info *fi;
2756  struct sockaddr_in *v_src;
2757  struct sockaddr_in t_src;
2758  int result = 0;
2759 
2760  hints = fi_allocinfo();
2761  if (hints == NULL)
2762  return M0_ERR(-ENOMEM);
2763  hints->fabric_attr->prov_name = (char *)providers[FAB_FABRIC_PROV_VERBS];
2764  result = fi_getinfo(FI_VERSION(1,11), NULL, NULL, 0, hints, &fi);
2765  if (result == 0) {
2766  /* For Verbs provider */
2767  v_src = fi->src_addr;
2768  inet_ntop(AF_INET, &v_src->sin_addr, fab_ndom->fnd_loc_ip,
2769  ARRAY_SIZE(fab_ndom->fnd_loc_ip));
2770  fab_ndom->fnd_seg_nr = FAB_VERBS_IOV_MAX;
2771  fab_ndom->fnd_seg_size = FAB_VERBS_MAX_BULK_SEG_SIZE;
2772  fi_freeinfo(fi); /* This frees the entire list. */
2773  } else {
2774  /* For TCP/Socket provider */
2775  t_src.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
2776  inet_ntop(AF_INET, &t_src.sin_addr, fab_ndom->fnd_loc_ip,
2777  ARRAY_SIZE(fab_ndom->fnd_loc_ip));
2778  fab_ndom->fnd_seg_nr = FAB_TCP_SOCK_IOV_MAX;
2779  fab_ndom->fnd_seg_size = FAB_TCP_SOCK_MAX_BULK_SEG_SIZE;
2780  }
2781  hints->fabric_attr->prov_name = NULL;
2782  fi_freeinfo(hints);
2783  return M0_RC(0);
2784 }
2785 
2786 static int libfab_buf_dom_dereg(struct m0_fab__buf *fbp)
2787 {
2788  m0_time_t tmout;
2789  int i;
2790  int ret = 0;
2791  uint32_t seg_nr;
2792 
2793  M0_PRE(fbp != NULL && fbp->fb_nb != NULL);
2794  seg_nr = fbp->fb_nb->nb_buffer.ov_vec.v_nr;
2795 
2796  for (i = 0; i < seg_nr; i++) {
2797  if (fbp->fb_mr.bm_mr[i] != NULL) {
2798  /*
2799  * If fi_close returns -EBUSY, that means that the
2800  * buffer is in use. In this case keep retry for a max
2801  * time of 5 min to deregister buffer till fi_close
2802  * returns success or some other error code.
2803  */
2804  tmout = m0_time_from_now(300, 0);
2805  ret = -EBUSY;
2806  while (ret == -EBUSY && !m0_time_is_in_past(tmout)) {
2807  ret = fi_close(&fbp->fb_mr.bm_mr[i]->fid);
2808  }
2809  if (ret != 0) {
2810  M0_LOG(M0_ERROR,"mr[%d] close failed %d fb=%p",
2811  i, ret, fbp);
2812  break;
2813  }
2814  fbp->fb_mr.bm_mr[i] = NULL;
2815  }
2816  }
2817 
2818  if (ret == 0) {
2819  fbp->fb_dp = NULL;
2820  fbp->fb_state = FAB_BUF_DEREGISTERED;
2821  }
2822 
2823  return M0_RC(ret);
2824 }
2825 
2826 /*============================================================================*/
2827 
2831 static int libfab_dom_init(const struct m0_net_xprt *xprt,
2832  struct m0_net_domain *dom)
2833 {
2834  struct m0_fab__ndom *fab_ndom;
2835  int ret = 0;
2836 
2837  M0_ENTRY("Running on %s", m0_processor_is_vm() ? "VM" : "HW");
2838 
2839  M0_ALLOC_PTR(fab_ndom);
2840  if (fab_ndom == NULL)
2841  return M0_ERR(-ENOMEM);
2842 
2843  ret = libfab_domain_params_get(fab_ndom);
2844  if (ret != 0)
2845  m0_free(fab_ndom);
2846  else {
2847  dom->nd_xprt_private = fab_ndom;
2848  fab_ndom->fnd_ndom = dom;
2849  m0_mutex_init(&fab_ndom->fnd_lock);
2850  fab_fabs_tlist_init(&fab_ndom->fnd_fabrics);
2851  }
2852  return M0_RC(ret);
2853 }
2854 
2858 static void libfab_dom_fini(struct m0_net_domain *dom)
2859 {
2860  struct m0_fab__ndom *fnd;
2861  struct m0_fab__fab *fab;
2862  int rc;
2863 
2864  M0_ENTRY();
2865  libfab_dom_invariant(dom);
2866  fnd = dom->nd_xprt_private;
2867  m0_tl_teardown(fab_fabs, &fnd->fnd_fabrics, fab) {
2868  if (fab->fab_dom != NULL) {
2869  rc = fi_close(&fab->fab_dom->fid);
2870  if (rc != 0)
2871  M0_LOG(M0_ERROR, "fab_dom fi_close ret=%d", rc);
2872  fab->fab_dom = NULL;
2873  }
2874 
2875  if (fab->fab_fab != NULL) {
2876  rc = fi_close(&fab->fab_fab->fid);
2877  if (rc != 0)
2878  M0_LOG(M0_ERROR, "fab_fabric fi_close ret=%d",
2879  rc);
2880  fab->fab_fab = NULL;
2881  }
2882 
2883  if (fab->fab_fi != NULL) {
2884  fi_freeinfo(fab->fab_fi);
2885  fab->fab_fi = NULL;
2886  }
2887 
2888  m0_free(fab);
2889  }
2890  fab_fabs_tlist_fini(&fnd->fnd_fabrics);
2891 
2892  m0_mutex_fini(&fnd->fnd_lock);
2893  fnd->fnd_ndom = NULL;
2894  m0_free(fnd);
2895  dom->nd_xprt_private = NULL;
2896 
2897  M0_LEAVE();
2898 }
2899 
2903 static void libfab_ma_fini(struct m0_net_transfer_mc *tm)
2904 {
2905  struct m0_fab__tm *ma = tm->ntm_xprt_private;
2906 
2907  M0_ENTRY();
2908  libfab_tm_fini(tm);
2909  tm->ntm_xprt_private = NULL;
2910 
2911  fab_buf_tlist_fini(&ma->ftm_done);
2912  m0_free(ma);
2913 
2914  M0_LEAVE();
2915 }
2916 
2922 static int libfab_ma_init(struct m0_net_transfer_mc *ntm)
2923 {
2924  struct m0_fab__tm *ftm;
2925  int rc = 0;
2926 
2927  M0_ASSERT(ntm->ntm_xprt_private == NULL);
2928  M0_ALLOC_PTR(ftm);
2929  if (ftm != NULL) {
2930  ftm->ftm_epfd = -1;
2931  ftm->ftm_state = FAB_TM_INIT;
2932  ntm->ntm_xprt_private = ftm;
2933  ftm->ftm_ntm = ntm;
2934  ftm->ftm_fids.ftf_cnt = 0;
2935  M0_ALLOC_ARR(ftm->ftm_fids.ftf_head, FAB_TM_FID_MALLOC_STEP);
2936  M0_ALLOC_ARR(ftm->ftm_fids.ftf_ctx, FAB_TM_FID_MALLOC_STEP);
2937  if (ftm->ftm_fids.ftf_head == NULL ||
2938  ftm->ftm_fids.ftf_ctx == NULL) {
2939  m0_free(ftm->ftm_fids.ftf_head);
2940  m0_free(ftm->ftm_fids.ftf_ctx);
2941  return M0_ERR(-ENOMEM);
2942  }
2943  ftm->ftm_fids.ftf_arr_size = FAB_TM_FID_MALLOC_STEP;
2944  fab_buf_tlist_init(&ftm->ftm_done);
2945  fab_bulk_tlist_init(&ftm->ftm_bulk);
2946  ftm->ftm_bufhash.bht_magic = M0_NET_LIBFAB_BUF_HT_HEAD_MAGIC;
2947  rc = fab_bufhash_htable_init(&ftm->ftm_bufhash.bht_hash,
2948  ((M0_NET_QT_NR + 1) *
2949  FAB_NUM_BUCKETS_PER_QTYPE));
2950  } else
2951  rc = M0_ERR(-ENOMEM);
2952 
2953  if (rc != 0 && ftm != NULL)
2954  libfab_ma_fini(ntm);
2955  return M0_RC(rc);
2956 }
2957 
2962 static int libfab_ma_start(struct m0_net_transfer_mc *ntm, const char *name)
2963 {
2964  struct m0_fab__tm *ftm = ntm->ntm_xprt_private;
2965  struct m0_fab__ndom *fnd;
2966  struct m0_net_end_point *nep;
2967  int rc = 0;
2968 
2969  M0_ASSERT(libfab_tm_is_locked(ftm));
2970  M0_ALLOC_PTR(ftm->ftm_pep);
2971  if (ftm->ftm_pep != NULL) {
2972  fnd = ntm->ntm_dom->nd_xprt_private;
2973  rc = libfab_ep_addr_decode(ftm->ftm_pep, name);
2974  if (rc != 0)
2975  return M0_ERR(rc);
2976 
2977  ftm->ftm_fab = libfab_newfab_init(fnd);
2978  ftm->ftm_fab->fab_prov = FAB_FABRIC_PROV_MAX;
2979  rc = libfab_passive_ep_create(ftm->ftm_pep, ftm);
2980  if (rc != 0)
2981  return M0_ERR(rc);
2982 
2983  nep = &ftm->ftm_pep->fep_nep;
2984  nep->nep_xprt_pvt = ftm->ftm_pep;
2985  nep->nep_tm = ntm;
2986  libfab_ep_pton(&ftm->ftm_pep->fep_name,
2987  &ftm->ftm_pep->fep_name_n);
2988  m0_nep_tlink_init_at_tail(nep, &ntm->ntm_end_points);
2989  ftm->ftm_pep->fep_nep.nep_addr = ftm->ftm_pep->fep_name.nia_p;
2990 
2991  m0_mutex_init(&ftm->ftm_endlock);
2992  m0_mutex_init(&ftm->ftm_evpost);
2993 
2994  rc = M0_THREAD_INIT(&ftm->ftm_poller, struct m0_fab__tm *, NULL,
2995  &libfab_poller, ftm, "libfab_tm");
2996  } else
2997  return M0_ERR(-ENOMEM);
2998 
2999  ftm->ftm_state = FAB_TM_STARTED;
3000  ftm->ftm_tmout_check = m0_time_from_now(FAB_BUF_TMOUT_CHK_INTERVAL, 0);
3001 
3002  return M0_RC(rc);
3003 }
3004 
3010 static int libfab_ma_stop(struct m0_net_transfer_mc *net, bool cancel)
3011 {
3012  struct m0_fab__tm *tm = net->ntm_xprt_private;
3013 
3014  M0_PRE(net->ntm_state == M0_NET_TM_STOPPING);
3015 
3016  if (cancel)
3018 
3019  libfab_tm_unlock(tm);
3020  libfab_tm_fini(net);
3021  libfab_tm_event_post(tm, M0_NET_TM_STOPPED);
3022  libfab_tm_lock(tm);
3023 
3024  return M0_RC(0);
3025 }
3026 
3030 static int libfab_ma_confine(struct m0_net_transfer_mc *ma,
3031  const struct m0_bitmap *processors)
3032 {
3033  return M0_ERR(-ENOSYS);
3034 }
3035 
3043 static int libfab_end_point_create(struct m0_net_end_point **epp,
3044  struct m0_net_transfer_mc *tm,
3045  const char *name)
3046 {
3047  M0_ENTRY("name=%s", name);
3048  return (libfab_ep_find(tm, name, NULL, epp));
3049 }
3050 
3058 static void libfab_buf_deregister(struct m0_net_buffer *nb)
3059 {
3060  struct m0_fab__buf *fb = nb->nb_xprt_private;
3061 
3062  M0_ENTRY("fb=%p nb=%p q=%d", fb, nb, nb->nb_qtype);
3064  libfab_buf_invariant(fb));
3065 
3066  libfab_buf_dom_dereg(fb);
3067  libfab_buf_fini(fb);
3068  m0_free(fb->fb_mr.bm_desc);
3069  m0_free(fb->fb_mr.bm_mr);
3070  m0_free(fb);
3071  nb->nb_xprt_private = NULL;
3072 }
3073 
3081 static int libfab_buf_register(struct m0_net_buffer *nb)
3082 {
3083  struct m0_fab__buf *fb;
3084  struct m0_fab__ndom *nd = nb->nb_dom->nd_xprt_private;
3085 
3086  M0_ENTRY("nb=%p q=%d", nb, nb->nb_qtype);
3087 
3088  M0_PRE(nb->nb_xprt_private == NULL);
3089  M0_PRE(nb->nb_dom != NULL);
3090 
3091  M0_ALLOC_PTR(fb);
3092  if (fb == NULL)
3093  return M0_ERR(-ENOMEM);
3094 
3095  M0_ALLOC_ARR(fb->fb_mr.bm_desc, nd->fnd_seg_nr);
3096  M0_ALLOC_ARR(fb->fb_mr.bm_mr, nd->fnd_seg_nr);
3097 
3098  if (fb->fb_mr.bm_desc == NULL || fb->fb_mr.bm_mr == NULL) {
3099  m0_free(fb->fb_mr.bm_desc);
3100  m0_free(fb->fb_mr.bm_mr);
3101  m0_free(fb);
3102  return M0_ERR(-ENOMEM);
3103  }
3104 
3105  fab_buf_tlink_init(fb);
3106  nb->nb_xprt_private = fb;
3107  fb->fb_nb = nb;
3108  fb->fb_state = FAB_BUF_INITIALIZED;
3109 
3110  return M0_RC(0);
3111 }
3112 
3120 static int libfab_buf_add(struct m0_net_buffer *nb)
3121 {
3122  struct m0_fab__buf *fbp = nb->nb_xprt_private;
3123  struct m0_fab__tm *ma = libfab_buf_ma(nb);
3124  struct m0_fab__ep *ep = NULL;
3125  struct m0_fab__active_ep *aep;
3126  struct iovec iv;
3127  struct m0_net_ip_addr addr = {};
3128  int ret = 0;
3129 
3130  M0_ENTRY("fb=%p nb=%p q=%d l=%"PRIu64, fbp, nb, nb->nb_qtype,
3131  nb->nb_length);
3132 
3133  M0_PRE(libfab_tm_is_locked(ma) && libfab_tm_invariant(ma) &&
3134  libfab_buf_invariant(fbp));
3135  M0_PRE(nb->nb_offset == 0); /* Do not support an offset during add. */
3136  M0_PRE((nb->nb_flags & M0_NET_BUF_RETAIN) == 0);
3137 
3138  fab_buf_tlink_init(fbp);
3139  fbp->fb_token = libfab_buf_token_get(ma, fbp);
3140  libfab_buf_dom_reg(nb, ma);
3141  fbp->fb_status = 0;
3142 
3143  switch (nb->nb_qtype) {
3144  case M0_NET_QT_MSG_RECV: {
3145  M0_ASSERT(nb->nb_buffer.ov_vec.v_nr == 1);
3146  fbp->fb_length = nb->nb_length;
3147  iv.iov_base = nb->nb_buffer.ov_buf[0];
3148  iv.iov_len = nb->nb_buffer.ov_vec.v_count[0];
3149  ret = fi_recvv(ma->ftm_rctx, &iv, fbp->fb_mr.bm_desc, 1, 0,
3150  U32_TO_VPTR(fbp->fb_token));
3151  break;
3152  }
3153 
3154  case M0_NET_QT_MSG_SEND: {
3156  M0_ASSERT(nb->nb_buffer.ov_vec.v_nr == 1);
3157  M0_ASSERT(nb->nb_ep != NULL);
3158 
3159  if (nb->nb_ep->nep_xprt_pvt == NULL)
3160  ret = libfab_ep_create(ma->ftm_ntm, nb->nb_ep->nep_addr,
3161  NULL, &nb->nb_ep);
3162  if (ret != 0)
3163  break;
3164  ep = nb->nb_ep->nep_xprt_pvt;
3165  aep = libfab_aep_get(ep);
3166  fbp->fb_txctx = ep;
3167 
3168  if (aep->aep_tx_state != FAB_CONNECTED)
3169  ret = libfab_conn_init(ep, ma, fbp);
3170  else {
3171  ret = libfab_txbuf_list_add(ma, fbp, aep);
3172  libfab_bufq_process(ma);
3173  }
3174  break;
3175  }
3176 
3177  /* For passive buffers, generate the buffer descriptor. */
3179  fbp->fb_length = nb->nb_length;
3180  if (!libfab_is_verbs(ma)) {
3181  ret = libfab_bdesc_encode(fbp);
3182  break;
3183  }
3184  /* else
3185  Intentional fall through */
3186  }
3187 
3189  if (m0_net_tm_tlist_is_empty(
3190  &ma->ftm_ntm->ntm_q[M0_NET_QT_MSG_RECV]))
3191  ret = fi_recv(ma->ftm_rctx, fbp->fb_dummy,
3192  sizeof(fbp->fb_dummy), NULL, 0,
3193  U32_TO_VPTR(fbp->fb_token));
3194 
3195  if (ret == 0)
3196  ret = libfab_bdesc_encode(fbp);
3197  break;
3198  }
3199 
3200  /* For active buffers, decode the passive buffer descriptor */
3202  fbp->fb_length = nb->nb_length;
3203  /* Intentional fall through */
3204 
3206  libfab_bdesc_decode(fbp, &addr.nia_n);
3207  ret = libfab_fab_ep_find(ma, NULL, &addr.nia_n, &ep);
3208  if (ret != 0)
3209  break;
3210  fbp->fb_txctx = ep;
3211  aep = libfab_aep_get(ep);
3212  if (aep->aep_tx_state != FAB_CONNECTED)
3213  ret = libfab_conn_init(ep, ma, fbp);
3214  else {
3215  ret = libfab_txbuf_list_add(ma, fbp, aep);
3216  libfab_bufq_process(ma);
3217  }
3218  break;
3219  }
3220 
3221  default:
3222  M0_IMPOSSIBLE("invalid queue type: %x", nb->nb_qtype);
3223  break;
3224  }
3225 
3226  if (ret == 0) {
3227  fbp->fb_state = FAB_BUF_QUEUED;
3228  m0_tlink_init(&fab_bufhash_tl, fbp);
3229  fab_bufhash_htable_add(&ma->ftm_bufhash.bht_hash, fbp);
3230  }
3231 
3232  return M0_RC(ret);
3233 }
3234 
3242 static void libfab_buf_del(struct m0_net_buffer *nb)
3243 {
3244  struct m0_fab__buf *buf = nb->nb_xprt_private;
3245  struct m0_fab__tm *ma = libfab_buf_ma(nb);
3246 
3247  M0_PRE(libfab_tm_is_locked(ma) && libfab_tm_invariant(ma) &&
3248  libfab_buf_invariant(buf));
3250 
3251  libfab_buf_dom_dereg(buf);
3252  buf->fb_state = FAB_BUF_CANCELED;
3253  libfab_buf_done(buf, -ECANCELED, false);
3254 }
3255 
3261 static int libfab_bev_deliver_sync(struct m0_net_transfer_mc *ma)
3262 {
3263  return 0;
3264 }
3265 
3271 static void libfab_bev_deliver_all(struct m0_net_transfer_mc *ma)
3272 {
3273 
3274 }
3275 
3281 static bool libfab_bev_pending(struct m0_net_transfer_mc *ma)
3282 {
3283  return false;
3284 }
3285 
3291 static void libfab_bev_notify(struct m0_net_transfer_mc *ma,
3292  struct m0_chan *chan)
3293 {
3294 
3295 }
3296 
3304 static m0_bcount_t libfab_get_max_buf_size(const struct m0_net_domain *dom)
3305 {
3306  struct m0_fab__ndom *nd = dom->nd_xprt_private;
3307 
3308  return (m0_bcount_t)(nd->fnd_seg_size * nd->fnd_seg_nr);
3309 }
3310 
3318 static m0_bcount_t libfab_get_max_buf_seg_size(const struct m0_net_domain *dom)
3319 {
3320  return ((struct m0_fab__ndom *)dom->nd_xprt_private)->fnd_seg_size;
3321 }
3322 
3330 static int32_t libfab_get_max_buf_segments(const struct m0_net_domain *dom)
3331 {
3332  return ((struct m0_fab__ndom *)dom->nd_xprt_private)->fnd_seg_nr;
3333 }
3341 static m0_bcount_t libfab_get_max_buf_desc_size(const struct m0_net_domain *dom)
3342 {
3343  struct m0_fab__ndom *nd = dom->nd_xprt_private;
3344  m0_bcount_t max_bd_size = sizeof(struct fi_rma_iov);
3345 
3346  max_bd_size = (max_bd_size * nd->fnd_seg_nr) +
3347  sizeof(struct m0_fab__bdesc);
3348 
3349  return max_bd_size;
3350 }
3351 
3359 static m0_bcount_t libfab_rpc_max_seg_size(struct m0_net_domain *ndom)
3360 {
3361  M0_PRE(ndom != NULL);
3362  return FAB_MAX_RPC_SEG_SIZE;
3363 }
3364 
3372 static uint32_t libfab_rpc_max_segs_nr(struct m0_net_domain *ndom)
3373 {
3374  M0_PRE(ndom != NULL);
3375  return FAB_MAX_RPC_SEG_NR;
3376 }
3377 
3385 static m0_bcount_t libfab_rpc_max_msg_size(struct m0_net_domain *ndom,
3386  m0_bcount_t rpc_size)
3387 {
3388  m0_bcount_t mbs;
3389  M0_PRE(ndom != NULL);
3390 
3391  mbs = libfab_rpc_max_seg_size(ndom) * libfab_rpc_max_segs_nr(ndom);
3392  return rpc_size != 0 ? m0_clip64u(M0_SEG_SIZE, mbs, rpc_size) : mbs;
3393 }
3394 
3402 static uint32_t libfab_rpc_max_recv_msgs(struct m0_net_domain *ndom,
3403  m0_bcount_t rpc_size)
3404 {
3405  M0_PRE(ndom != NULL);
3406  return FAB_MAX_RPC_RECV_MSG_NR;
3407 }
3408 
3409 static const struct m0_net_xprt_ops libfab_xprt_ops = {
3410  .xo_dom_init = &libfab_dom_init,
3411  .xo_dom_fini = &libfab_dom_fini,
3412  .xo_tm_init = &libfab_ma_init,
3413  .xo_tm_confine = &libfab_ma_confine,
3414  .xo_tm_start = &libfab_ma_start,
3415  .xo_tm_stop = &libfab_ma_stop,
3416  .xo_tm_fini = &libfab_ma_fini,
3417  .xo_end_point_create = &libfab_end_point_create,
3418  .xo_buf_register = &libfab_buf_register,
3419  .xo_buf_deregister = &libfab_buf_deregister,
3420  .xo_buf_add = &libfab_buf_add,
3421  .xo_buf_del = &libfab_buf_del,
3422  .xo_bev_deliver_sync = &libfab_bev_deliver_sync,
3423  .xo_bev_deliver_all = &libfab_bev_deliver_all,
3424  .xo_bev_pending = &libfab_bev_pending,
3425  .xo_bev_notify = &libfab_bev_notify,
3426  .xo_get_max_buffer_size = &libfab_get_max_buf_size,
3427  .xo_get_max_buffer_segment_size = &libfab_get_max_buf_seg_size,
3428  .xo_get_max_buffer_segments = &libfab_get_max_buf_segments,
3429  .xo_get_max_buffer_desc_size = &libfab_get_max_buf_desc_size,
3430  .xo_rpc_max_seg_size = &libfab_rpc_max_seg_size,
3431  .xo_rpc_max_segs_nr = &libfab_rpc_max_segs_nr,
3432  .xo_rpc_max_msg_size = &libfab_rpc_max_msg_size,
3433  .xo_rpc_max_recv_msgs = &libfab_rpc_max_recv_msgs,
3434 };
3435 
3436 struct m0_net_xprt m0_net_libfab_xprt = {
3437  .nx_name = "libfab",
3438  .nx_ops = &libfab_xprt_ops
3439 };
3440 M0_EXPORTED(m0_net_libfab_xprt);
3441 
3442 #else /* ENABLE_LIBFAB */
3443 
3444 /* libfab init and fini() : initialized in motr init */
3445 M0_INTERNAL int m0_net_libfab_init(void)
3446 {
3447  return M0_RC(0);
3448 }
3449 
3450 M0_INTERNAL void m0_net_libfab_fini(void)
3451 {
3452 
3453 }
3454 
3455 #endif /* ENABLE_LIBFAB */
3456 
3457 #undef M0_TRACE_SUBSYSTEM
3458 
3461 /*
3462  * Local variables:
3463  * c-indentation-style: "K&R"
3464  * c-basic-offset: 8
3465  * tab-width: 8
3466  * fill-column: 80
3467  * scroll-step: 1
3468  * End:
3469  */
3470 /*
3471  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
3472  */
union m0_net_ip_params::@378 nip_fmt_pvt
M0_INTERNAL int m0_mutex_trylock(struct m0_mutex *mutex)
Definition: mutex.c:84
static void ptr(struct m0_addb2__context *ctx, const uint64_t *v, char *buf)
Definition: dump.c:440
struct m0_net_ip_lnet_addr la
Definition: ip.h:81
static size_t nr
Definition: dump.c:1505
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
struct m0t1fs_fsync_interactions fi
Definition: fsync.c:55
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
enum m0_net_ip_format nip_format
Definition: ip.h:73
#define m0_htable_for(name, var, htable)
Definition: hash.h:483
#define M0_HT_DESCR_DEFINE(name, htname, scope, amb_type, amb_link_field, amb_magic_field, amb_magic, head_magic, key_field, hash_func, key_eq)
Definition: hash.h:326
static uint32_t seg_nr
Definition: net.c:119
M0_INTERNAL void m0_net_libfab_fini(void)
Definition: libfab.c:3450
#define NULL
Definition: misc.h:38
m0_bindex_t nb_offset
Definition: net.h:1344
static struct m0_bufvec dst
Definition: xform.c:61
m0_net_ip_format
Definition: ip.h:33
struct m0_bufvec nb_buffer
Definition: net.h:1322
M0_INTERNAL int m0_net_hostname_to_ip(const char *hostname, char *ip, enum m0_net_ip_format *fmt)
Definition: ip.c:422
M0_INTERNAL void m0_net__tm_cancel(struct m0_net_transfer_mc *tm)
Definition: tm.c:145
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
uint32_t nbd_len
Definition: net_otw_types.h:37
union m0_net_ip_params::@377 nip_ip_n
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
uint16_t nia_family
Definition: ip.h:59
uint8_t * nbd_data
Definition: net_otw_types.h:38
struct m0_vec ov_vec
Definition: vec.h:147
Definition: sock.c:772
struct m0_bufvec data
Definition: di.c:40
m0_bcount_t nb_length
Definition: net.h:1334
uint64_t nb_flags
Definition: net.h:1489
struct m0_net_domain * ntm_dom
Definition: net.h:853
M0_INTERNAL void m0_net_xprt_default_set(const struct m0_net_xprt *xprt)
Definition: net.c:143
uint64_t m0_bindex_t
Definition: types.h:80
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
const char * nep_addr
Definition: net.h:503
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
M0_INTERNAL void m0_net_buffer_event_post(const struct m0_net_buffer_event *ev)
Definition: buf.c:314
struct m0_net_ip_params nia_n
Definition: ip.h:87
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
static int struct dentry int struct nameidata * nd
Definition: dir.c:593
void ** ov_buf
Definition: vec.h:149
#define PRIx64
Definition: types.h:61
Definition: sock.c:887
struct m0_tl ntm_end_points
Definition: net.h:856
#define m0_tl_endfor
Definition: tlist.h:700
char nia_p[M0_NET_IP_STRLEN_MAX]
Definition: ip.h:88
struct m0_fid fid
Definition: di.c:46
M0_INTERNAL int m0_net_ip_print(const struct m0_net_ip_addr *nia)
Definition: ip.c:356
return M0_RC(rc)
op
Definition: libdemo.c:64
Definition: sock.c:754
static uint64_t m0_clip64u(uint64_t lo, uint64_t hi, uint64_t x)
Definition: arith.h:131
#define M0_ENTRY(...)
Definition: trace.h:170
uint16_t nip_port
Definition: ip.h:78
static char * addr
Definition: node_k.c:37
int i
Definition: dir.c:1033
#define PRIu64
Definition: types.h:58
M0_INTERNAL void m0_net_xprt_register(const struct m0_net_xprt *xprt)
Definition: net.c:182
struct m0_fid new_fid
Definition: dir.c:625
bool m0_time_is_in_past(m0_time_t t)
Definition: time.c:102
int32_t nbe_status
Definition: net.h:1218
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
Definition: refs.c:38
void m0_ref_init(struct m0_ref *ref, int init_num, void(*release)(struct m0_ref *ref))
Definition: refs.c:24
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
Definition: refs.c:32
void * ntm_xprt_private
Definition: net.h:886
Definition: cnt.h:36
static int key
Definition: locality.c:283
const char * name
Definition: trace.c:110
struct m0_net_ip_inet_addr ia
Definition: ip.h:80
Definition: refs.h:34
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
if(value==NULL)
Definition: dir.c:350
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
void * nd_xprt_private
Definition: net.h:393
#define M0_ASSERT(cond)
m0_time_t nb_timeout
Definition: net.h:1387
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
void * nep_xprt_pvt
Definition: net.h:505
m0_time_t m0_time_now(void)
Definition: time.c:134
#define M0_DEFAULT_NETWORK
Definition: config.h:281
m0_net_tm_state
Definition: net.h:630
#define m0_streq(a, b)
Definition: string.h:34
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
static struct m0_stob_domain * dom
Definition: storage.c:38
struct m0_net_domain * nb_dom
Definition: net.h:1351
void * m0_alloc(size_t size)
Definition: memory.c:126
M0_INTERNAL void m0_net_tm_event_post(const struct m0_net_tm_event *ev)
Definition: tm.c:84
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
#define M0_POST(cond)
Definition: xcode.h:73
static void token(struct ff2c_context *ctx, struct ff2c_term *term, struct ff2c_token *tok)
Definition: parser.c:66
uint32_t v_nr
Definition: vec.h:51
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
Definition: ep.c:88
Definition: chan.h:229
struct m0_net_transfer_mc * nep_tm
Definition: net.h:493
m0_bcount_t * v_count
Definition: vec.h:53
static uint64_t min64u(uint64_t a, uint64_t b)
Definition: arith.h:66
M0_INTERNAL void m0_tlink_init(const struct m0_tl_descr *d, void *obj)
Definition: tlist.c:63
#define M0_TL_DEFINE(name, scope, amb_type)
Definition: tlist.h:550
static struct fdmi_ctx ctx
Definition: main.c:80
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
uint16_t nla_tmid
Definition: ip.h:67
static uint32_t min32u(uint32_t a, uint32_t b)
Definition: arith.h:56
char * ep
Definition: sw.h:132
M0_INTERNAL bool m0_net__buffer_invariant(const struct m0_net_buffer *buf)
Definition: buf.c:46
static struct m0_chan chan[RDWR_REQUEST_MAX]
Definition: glob.c:32
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
M0_INTERNAL int m0_net_libfab_init(void)
Definition: libfab.c:3445
M0_INTERNAL struct m0_thread * m0_thread_self(void)
Definition: thread.c:122
Definition: addb2.c:200
static struct m0_rm_remote * remote
Definition: rm_fops.c:35
M0_INTERNAL int m0_net_ip_parse(const char *name, struct m0_net_ip_addr *addr)
Definition: ip.c:350
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
#define M0_TL_DESCR_DEFINE(name, hname, scope, amb_type, amb_link_field, amb_magic_field, amb_magic, head_magic)
Definition: tlist.h:535
const char * nx_name
Definition: net.h:125
struct m0t1fs_filedata * fd
Definition: dir.c:1030
M0_INTERNAL void m0_net_xprt_deregister(const struct m0_net_xprt *xprt)
Definition: net.c:197
int(* xo_dom_init)(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
Definition: net.h:139
static uint64_t found
Definition: base.c:376
static struct m0_addb2_net * net
Definition: net.c:27
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
#define likely(x)
Definition: assert.h:70
uint32_t sn[4]
Definition: ip.h:76
Definition: nucleus.c:42
#define out(...)
Definition: gen.c:41
struct m0_net_xprt * xprt
Definition: module.c:61
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL bool m0_processor_is_vm(void)
Definition: processor.c:1171
#define m0_htable_endfor
Definition: hash.h:491
void * nb_xprt_private
Definition: net.h:1461
M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
Definition: chan.c:172
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
#define M0_BASSERT(cond)
int const char void * buffer
Definition: dir.c:435
struct m0_net_end_point * nb_ep
Definition: net.h:1424
#define M0_HT_DEFINE(name, scope, amb_type, key_type)
Definition: hash.h:377
Definition: idx_mock.c:47
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL bool m0_net_ip_addr_eq(const struct m0_net_ip_addr *addr1, const struct m0_net_ip_addr *addr2, bool is_ncmp)
Definition: ip.c:471