Motr  M0
service.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2021 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CAS
24 #include "be/op.h"
25 #include "be/tx_credit.h"
26 #include "be/dtm0_log.h" /* m0_be_dtm0_log API */
27 #include "dtm0/fop.h" /* DTM0 msg and tx_desc */
28 #include "dtm0/service.h" /* m0_dtm0_service API */
29 #include "lib/trace.h"
30 #include "lib/memory.h"
31 #include "lib/finject.h"
32 #include "lib/assert.h"
33 #include "lib/arith.h" /* min_check, M0_3WAY */
34 #include "lib/misc.h" /* M0_IN */
35 #include "lib/errno.h" /* ENOMEM, EPROTO */
36 #include "lib/ext.h"
37 #include "fop/fom_long_lock.h"
38 #include "fop/fom_generic.h"
39 #include "fop/fom_interpose.h"
40 #include "reqh/reqh_service.h"
41 #include "reqh/reqh.h" /* m0_reqh */
42 #include "rpc/rpc_opcodes.h"
43 #include "rpc/rpc_machine.h" /* m0_rpc_machine */
44 #include "conf/schema.h" /* M0_CST_CAS */
45 #include "module/instance.h"
46 #include "addb2/addb2.h"
47 #include "cas/ctg_store.h"
48 #include "pool/pool.h" /* m0_pool_nd_state */
49 
50 #include "dix/cm/cm.h"
51 #include "dix/fid_convert.h" /* m0_dix_fid_cctg_device_id */
52 
53 #include "cas/cas_addb2.h" /* M0_AVI_CAS_KV_SIZES */
54 #include "cas/cas.h"
55 #include "cas/cas_xc.h"
56 #include "cas/index_gc.h"
57 #include "motr/setup.h" /* m0_reqh_context */
58 
292 enum stats_kv {
296 };
297 
302 };
303 
304 enum {
309 };
310 
311 struct cas_service {
314 };
315 
316 struct cas_kv {
317  struct m0_buf ckv_key;
318  struct m0_buf ckv_val;
319 };
320 
321 struct cas_fom {
322  struct m0_fom cf_fom;
323  uint64_t cf_ipos;
324  uint64_t cf_opos;
337  uint64_t cf_curpos;
344  struct cas_kv *cf_ikv;
346  uint64_t cf_ikv_nr;
354  uint64_t cf_in_cids_nr;
361  /* ADDB2 structures to collect long-lock contention metrics. */
367  /* AT helper fields. */
370  /* Statistical counters. */
371  uint64_t cf_kv_stats[STATS_KV_NR]
374 };
375 
390 
397 
416 };
417 
423 
424 #define LAYOUT_IMASK_PTR(l) (&(l)->u.dl_desc.ld_imask)
425 #define CID_IMASK_PTR(cid) LAYOUT_IMASK_PTR(&(cid)->ci_layout)
426 
427 static int cas_service_start (struct m0_reqh_service *service);
428 static void cas_service_stop (struct m0_reqh_service *service);
429 static void cas_service_fini (struct m0_reqh_service *service);
430 static size_t cas_fom_home_locality (const struct m0_fom *fom);
432  const struct m0_reqh_service_type *st);
433 
434 static struct m0_cas_op *cas_op (const struct m0_fom *fom);
435 static const struct m0_fid *cas_fid (const struct m0_fom *fom);
436 static enum m0_cas_type cas_type (const struct m0_fom *fom);
437 static struct m0_cas_rec *cas_at (struct m0_cas_op *op, int idx);
438 static struct m0_cas_rec *cas_out_at (const struct m0_cas_rep *rep, int idx);
439 static bool cas_is_ro (enum m0_cas_opcode opc);
440 static enum m0_cas_opcode m0_cas_opcode (const struct m0_fop *fop);
441 static uint64_t cas_in_nr (const struct m0_fop *fop);
442 static uint64_t cas_out_nr (const struct m0_fop *fop);
443 static void cas_prep (struct cas_fom *fom,
444  enum m0_cas_opcode opc,
445  enum m0_cas_type ct,
446  struct m0_cas_ctg *ctg,
447  uint64_t rec_pos,
448  struct m0_be_tx_credit *accum);
449 static int cas_exec (struct cas_fom *fom,
450  enum m0_cas_opcode opc,
451  enum m0_cas_type ct,
452  struct m0_cas_ctg *ctg,
453  uint64_t rec_pos, int next);
454 
455 static int cas_done(struct cas_fom *fom, struct m0_cas_op *op,
456  struct m0_cas_rep *rep, enum m0_cas_opcode opc);
457 
458 static bool cas_ctidx_op_needed(struct cas_fom *fom, enum m0_cas_opcode opc,
459  enum m0_cas_type ct, uint64_t rec_pos);
460 
461 static bool cas_key_need_to_send(struct cas_fom *fom, enum m0_cas_opcode opc,
462  enum m0_cas_type ct, struct m0_cas_op *op,
463  uint64_t rec_pos);
464 
465 static int cas_prep_send(struct cas_fom *fom, enum m0_cas_opcode opc,
466  enum m0_cas_type ct);
467 
468 static bool cas_is_valid (struct cas_fom *fom, enum m0_cas_opcode opc,
469  enum m0_cas_type ct, const struct m0_cas_rec *rec,
470  uint64_t rec_pos);
471 static int cas_op_recs_check(struct cas_fom *fom, enum m0_cas_opcode opc,
472  enum m0_cas_type ct, struct m0_cas_op *op);
473 
474 static bool cas_fom_invariant(const struct cas_fom *fom);
475 
476 static int cas_buf_cid_decode(struct m0_buf *enc_buf,
477  struct m0_cas_id *cid);
478 static bool cas_fid_is_cctg(const struct m0_fid *fid);
479 static int cas_id_check(const struct m0_cas_id *cid);
480 static int cas_device_check(const struct cas_fom *fom,
481  const struct m0_cas_id *cid);
482 static int cas_op_check(struct m0_cas_op *op,
483  struct cas_fom *fom,
484  bool is_index_drop);
485 static void cas_incoming_kv(const struct cas_fom *fom,
486  uint64_t rec_pos,
487  struct m0_buf *key,
488  struct m0_buf *val);
489 static int cas_incoming_kv_setup(struct cas_fom *fom,
490  const struct m0_cas_op *op);
491 
492 static int cas_kv_load_done(struct cas_fom *fom, enum m0_cas_opcode opc,
493  const struct m0_cas_op *op, int phase);
494 
495 static int cas_ctg_crow_handle(struct cas_fom *fom,
496  const struct m0_cas_id *cid);
497 
498 static int cas_ctidx_mem_place(struct cas_fom *fom,
499  const struct m0_cas_id *in_cid, int next);
500 static int cas_ctidx_mem_free(struct cas_fom *fom, int next);
501 static int cas_ctidx_insert(struct cas_fom *fom, const struct m0_cas_id *in_cid,
502  int next);
503 static int cas_ctidx_lookup(struct cas_fom *fom, const struct m0_cas_id *in_cid,
504  int next);
505 static int cas_ctidx_delete(struct cas_fom *fom, const struct m0_cas_id *in_cid,
506  int next);
507 
510 static const struct m0_fom_ops cas_fom_ops;
511 static const struct m0_fom_type_ops cas_fom_type_ops;
512 static struct m0_sm_conf cas_sm_conf;
514 
515 M0_INTERNAL void m0_cas_svc_init(void)
516 {
526  m0_cas_gc_init();
527 }
528 
529 M0_INTERNAL void m0_cas_svc_fini(void)
530 {
531  m0_cas_gc_fini();
534 }
535 
536 M0_INTERNAL void m0_cas_svc_fop_args(struct m0_sm_conf **sm_conf,
537  const struct m0_fom_type_ops **fom_ops,
538  struct m0_reqh_service_type **svctype)
539 {
540  *sm_conf = &cas_sm_conf;
541  *fom_ops = &cas_fom_type_ops;
542  *svctype = &m0_cas_service_type;
543 }
544 
545 M0_INTERNAL void m0_cas__ut_svc_be_set(struct m0_reqh_service *svc,
546  struct m0_be_domain *dom)
547 {
549  service->c_be_domain = dom;
550 }
551 
552 M0_INTERNAL struct m0_be_domain *
554 {
556  return service->c_be_domain;
557 }
558 
559 
561 {
562  int rc;
564  struct m0_be_domain *ut_dom;
565 
567  ut_dom = m0_cas__ut_svc_be_get(svc);
568  /* XXX It's a workaround. It's needed until we have a better way. */
569  service->c_be_domain = ut_dom != NULL ?
570  ut_dom : svc->rs_reqh_ctx->rc_beseg->bs_domain;
571  rc = m0_ctg_store_init(service->c_be_domain);
572  if (rc == 0) {
573  /*
574  * Start deleted index garbage collector at boot to continue
575  * index drop possibly interrupted by system restart.
576  * If no pending index drop, it finishes soon.
577  */
579  }
580  return rc;
581 }
582 
584 {
585  /* Wait until garbage collector destroys all dead indices. */
587 }
588 
590 {
593 }
594 
596 {
598 
601  m0_free(service);
602 }
603 
605  const struct m0_reqh_service_type *stype)
606 {
607  struct cas_service *service;
608 
610  if (service != NULL) {
611  *svc = &service->c_service;
612  (*svc)->rs_type = stype;
613  (*svc)->rs_ops = &cas_service_ops;
614  return M0_RC(0);
615  } else
616  return M0_ERR(-ENOMEM);
617 }
618 
619 static bool cas_service_started(struct m0_fop *fop,
620  struct m0_reqh *reqh)
621 {
622  struct m0_reqh_service *svc;
623  const struct m0_reqh_service_type *stype;
624 
626  M0_ASSERT(stype != NULL);
627 
629  M0_ASSERT(svc != NULL);
630 
632 }
633 
634 static int cas_fom_create(struct m0_fop *fop,
635  struct m0_fom **out, struct m0_reqh *reqh)
636 {
637  struct cas_fom *fom;
638  struct m0_fom *fom0;
639  struct m0_fop *repfop;
640  struct m0_cas_rep *repdata;
641  struct m0_cas_rec *repv;
642  struct cas_kv *ikv;
643  uint64_t in_nr;
644  uint64_t out_nr;
645 
647  return M0_ERR(-EAGAIN);
648 
649  M0_ALLOC_PTR(fom);
655  in_nr = cas_in_nr(fop);
656  if (in_nr != 0)
657  M0_ALLOC_ARR(ikv, in_nr);
658  else
659  ikv = NULL;
660 
661  out_nr = cas_out_nr(fop);
662  repfop = m0_fop_reply_alloc(fop, &cas_rep_fopt);
670  if (out_nr != 0)
671  M0_ALLOC_ARR(repv, out_nr);
672  else
673  repv = NULL;
674  if (fom != NULL && repfop != NULL &&
675  (in_nr == 0 || ikv != NULL) &&
676  (out_nr == 0 || repv != NULL)) {
677  *out = fom0 = &fom->cf_fom;
678  fom->cf_ikv = ikv;
679  fom->cf_ikv_nr = in_nr;
680  repdata = m0_fop_data(repfop);
681  repdata->cgr_rep.cr_nr = out_nr;
682  repdata->cgr_rep.cr_rec = repv;
684  &cas_fom_ops, fop, repfop, reqh);
685  m0_long_lock_link_init(&fom->cf_lock, fom0,
686  &fom->cf_lock_addb2);
687  m0_long_lock_link_init(&fom->cf_meta, fom0,
688  &fom->cf_meta_addb2);
689  m0_long_lock_link_init(&fom->cf_ctidx, fom0,
690  &fom->cf_ctidx_addb2);
691  m0_long_lock_link_init(&fom->cf_dead_index, fom0,
692  &fom->cf_dead_index_addb2);
693  m0_long_lock_link_init(&fom->cf_del_lock, fom0,
694  &fom->cf_del_lock_addb2);
695  return M0_RC(0);
696  } else {
697  m0_free(ikv);
698  m0_free(repfop);
699  m0_free(repv);
700  m0_free(fom);
701  return M0_ERR(-ENOMEM);
702  }
703 }
704 
705 static int cas_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom,
706  int next_phase)
707 {
708  if (cas_in_ut()) {
709  m0_fom_phase_set(fom, next_phase);
710  return M0_FSO_AGAIN;
711  }
712 
713  return m0_rpc_at_load(ab, fom, next_phase);
714 }
715 
716 static int cas_at_reply(struct m0_rpc_at_buf *in,
717  struct m0_rpc_at_buf *out,
718  struct m0_buf *repbuf,
719  struct m0_fom *fom,
720  int next_phase)
721 {
722  if (cas_in_ut()) {
723  m0_fom_phase_set(fom, next_phase);
724  out->ab_type = M0_RPC_AT_INLINE;
725  out->u.ab_buf = *repbuf;
726  return M0_FSO_AGAIN;
727  }
728 
729  return m0_rpc_at_reply(in, out, repbuf, fom, next_phase);
730 }
731 
732 static void cas_at_fini(struct m0_rpc_at_buf *ab)
733 {
734  if (cas_in_ut()) {
735  ab->ab_type = M0_RPC_AT_EMPTY;
736  return;
737  }
738 
739  m0_rpc_at_fini(ab);
740 }
741 
742 static void cas_incoming_kv(const struct cas_fom *fom,
743  uint64_t rec_pos,
744  struct m0_buf *key,
745  struct m0_buf *val)
746 {
747  *key = fom->cf_ikv[rec_pos].ckv_key;
748  *val = fom->cf_ikv[rec_pos].ckv_val;
749 }
750 
751 static void cas_update_kv_stats(struct cas_fom *fom,
752  const struct m0_rpc_at_buf *ab,
753  m0_bcount_t nob,
754  enum stats_kv kv,
755  enum stats_kv_io kv_io)
756 {
757  if (ab->ab_type == M0_RPC_AT_INLINE)
758  fom->cf_kv_stats[kv][kv_io][STATS_NR_INLINE]++;
759  else if (M0_IN(ab->ab_type, (M0_RPC_AT_BULK_SEND,
762  fom->cf_kv_stats[kv][kv_io][STATS_NR_BULK]++;
763  fom->cf_kv_stats[kv][kv_io][STATS_SIZE] += nob;
764 }
765 
766 static int cas_incoming_kv_setup(struct cas_fom *fom,
767  const struct m0_cas_op *op)
768 {
769  uint64_t i;
770  struct m0_cas_rec *rec;
771  struct m0_buf *key;
772  struct m0_buf *val;
773  int rc = 0;
774 
775  for (i = 0; i < op->cg_rec.cr_nr && rc == 0; i++) {
776  rec = &op->cg_rec.cr_rec[i];
777  key = &fom->cf_ikv[i].ckv_key;
778  val = &fom->cf_ikv[i].ckv_val;
779 
781  rc = m0_rpc_at_get(&rec->cr_key, key);
782  if (rc == 0) {
783  cas_update_kv_stats(fom, &rec->cr_key, key->b_nob,
785  if (m0_rpc_at_is_set(&rec->cr_val)) {
786  rc = m0_rpc_at_get(&rec->cr_val, val);
787  if (rc == 0)
789  &rec->cr_val,
790  val->b_nob,
791  STATS_VAL,
792  STATS_KV_IN);
793  } else
794  *val = M0_BUF_INIT0;
795  }
796  }
797 
798  return M0_RC(rc);
799 }
800 
806  const struct m0_cas_op *op,
807  bool key,
808  size_t opos)
809 {
810  struct m0_rpc_at_buf *ret;
811  struct m0_cas_rec *rec = NULL;
812  uint64_t i = 0;
813 
814  switch (opc) {
815  case CO_PUT:
816  case CO_DEL:
817  case CO_GC:
818  ret = NULL;
819  break;
820  case CO_GET:
821  ret = key ? NULL : &op->cg_rec.cr_rec[opos].cr_val;
822  break;
823  case CO_CUR:
824  while (opos >= op->cg_rec.cr_rec[i].cr_rc) {
825  opos -= op->cg_rec.cr_rec[i].cr_rc;
826  i++;
827  }
828  if (i < op->cg_rec.cr_nr)
829  rec = &op->cg_rec.cr_rec[i];
830  if (rec != NULL && rec->cr_kv_bufs.cv_nr != 0) {
831  struct m0_cas_kv_vec *kv;
832 
833  kv = &rec->cr_kv_bufs;
834  if (opos < kv->cv_nr)
835  ret = key ? &kv->cv_rec[opos].ck_key :
836  &kv->cv_rec[opos].ck_val;
837  else
838  ret = NULL;
839  } else
840  ret = NULL;
841  break;
842  default:
843  M0_IMPOSSIBLE("Invalid opcode.");
844  }
845  return ret;
846 }
847 
858 static int cas_key_send(struct cas_fom *fom,
859  const struct m0_cas_op *op,
860  enum m0_cas_opcode opc,
861  const struct m0_cas_rep *rep,
862  enum cas_fom_phase next_phase)
863 {
864  struct m0_cas_rec *orec = cas_out_at(rep, fom->cf_opos);
865  struct m0_rpc_at_buf *in;
866  int result;
867 
868  M0_ENTRY("fom %p, opc %d, opos %"PRIu64, fom, opc, fom->cf_opos);
869  m0_rpc_at_init(&orec->cr_key);
870  in = cas_out_complementary(opc, op, true, fom->cf_opos);
871  result = cas_at_reply(in,
872  &orec->cr_key,
873  &fom->cf_out_key,
874  &fom->cf_fom,
875  next_phase);
876  cas_update_kv_stats(fom, &orec->cr_key, fom->cf_out_key.b_nob,
878  return result;
879 }
880 
884 static int cas_val_send(struct cas_fom *fom,
885  const struct m0_cas_op *op,
886  enum m0_cas_opcode opc,
887  const struct m0_cas_rep *rep,
888  enum cas_fom_phase next_phase)
889 {
890  struct m0_cas_rec *orec = cas_out_at(rep, fom->cf_opos);
891  struct m0_rpc_at_buf *in;
892  int result;
893 
894  M0_ENTRY("fom %p, opc %d, opos %"PRIu64, fom, opc, fom->cf_opos);
895  m0_rpc_at_init(&orec->cr_val);
896  in = cas_out_complementary(opc, op, false, fom->cf_opos);
897  result = cas_at_reply(in,
898  &orec->cr_val,
899  &fom->cf_out_val,
900  &fom->cf_fom,
901  next_phase);
902  cas_update_kv_stats(fom, &orec->cr_val, fom->cf_out_val.b_nob,
904  return result;
905 }
906 
907 static int cas_op_recs_check(struct cas_fom *fom,
908  enum m0_cas_opcode opc,
909  enum m0_cas_type ct,
910  struct m0_cas_op *op)
911 {
912  /*
913  * Tricky: cas_is_valid() has side effects when working
914  * with meta: it fills ->cf_in_cids.
915  */
916  return op->cg_rec.cr_nr != 0 && op->cg_rec.cr_rec != NULL &&
917  m0_forall(i, op->cg_rec.cr_nr,
918  cas_is_valid(fom, opc, ct, cas_at(op, i), i)) ?
919  M0_RC(0) : M0_ERR(-EPROTO);
920 }
921 
923 {
924  struct m0_fom *fom0 = &fom->cf_fom;
925  bool payload_exceeded = false;
926 
927  /*
928  * For some unit tests it is ok when item session is NULL, we can't call
929  * m0_rpc_item_max_payload_exceeded() in this case.
930  */
931  if (!cas_in_ut() || fom0->fo_fop->f_item.ri_session != NULL)
932  payload_exceeded =
934  &fom0->fo_rep_fop->f_item,
935  fom0->fo_fop->f_item.ri_session);
936  return payload_exceeded;
937 }
938 
939 static bool cas_key_need_to_send(struct cas_fom *fom, enum m0_cas_opcode opc,
940  enum m0_cas_type ct, struct m0_cas_op *op,
941  uint64_t rec_pos)
942 {
943  struct m0_buf in_key;
944  struct m0_buf in_val;
945  struct m0_buf key;
946  struct m0_buf val;
947  struct m0_cas_id *cid;
948  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
949  bool key_send = true;
950 
951  if (opc == CO_CUR && fom->cf_curpos == 0) {
952  if (op->cg_flags & COF_EXCLUDE_START_KEY) {
953  if (op->cg_flags & COF_SLANT) {
954  m0_ctg_cursor_kv_get(ctg_op, &key, &val);
955  if (ct != CT_META) {
956  cas_incoming_kv(fom, rec_pos, &in_key,
957  &in_val);
958  if (m0_buf_eq(&key, &in_key))
959  key_send = false;
960  } else {
961  cid = &fom->cf_in_cids[rec_pos];
962  if (m0_fid_eq(&cid->ci_fid, key.b_addr))
963  key_send = false;
964  }
965  } else
966  key_send = false;
967  }
968 
969  if (!key_send)
970  fom->cf_startkey_excluded = true;
971  }
972 
973  return key_send;
974 }
975 
976 static void cas_fom_cleanup(struct cas_fom *fom, bool ctg_op_fini)
977 {
978  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
979  struct m0_cas_ctg *meta = m0_ctg_meta();
980  struct m0_cas_ctg *ctidx = m0_ctg_ctidx();
981  struct m0_cas_ctg *dead_index = m0_ctg_dead_index();
982 
983  m0_long_unlock(m0_ctg_lock(meta), &fom->cf_meta);
984  m0_long_unlock(m0_ctg_lock(ctidx), &fom->cf_ctidx);
985  m0_long_unlock(m0_ctg_lock(dead_index), &fom->cf_dead_index);
986  m0_long_unlock(m0_ctg_del_lock(), &fom->cf_del_lock);
987  if (fom->cf_ctg != NULL)
988  m0_long_unlock(m0_ctg_lock(fom->cf_ctg), &fom->cf_lock);
989  if (ctg_op_fini) {
990  if (m0_ctg_cursor_is_initialised(ctg_op))
991  m0_ctg_cursor_fini(ctg_op);
992  m0_ctg_op_fini(ctg_op);
993  }
994 }
995 
996 static void cas_fom_failure(struct cas_fom *fom, int rc, bool ctg_op_fini)
997 {
998  struct m0_cas_rep *repdata;
999  struct m0_cas_rec *repv;
1000  uint64_t i;
1001 
1002  M0_PRE(rc < 0);
1003 
1004  /*
1005  * Some generic error happened, no input record can be processed.
1006  * Clean already filled items in reply operation results.
1007  * CAS client shouldn't access cgr_rep array if cgr_rc is non-zero.
1008  */
1009  repdata = m0_fop_data(fom->cf_fom.fo_rep_fop);
1010  for (i = 0; i < repdata->cgr_rep.cr_nr; i++) {
1011  repv = &repdata->cgr_rep.cr_rec[i];
1014  }
1015  m0_free(repdata->cgr_rep.cr_rec);
1016  repdata->cgr_rep.cr_rec = NULL;
1017  repdata->cgr_rep.cr_nr = 0;
1018 
1019  cas_fom_cleanup(fom, ctg_op_fini);
1021 }
1022 
1023 static int cas_dtm0_logrec_credit_add(struct m0_fom *fom0)
1024 {
1025  struct m0_be_tx_credit cred = {};
1026  struct m0_xcode_ctx ctx = {};
1027  int rc;
1028 
1029  M0_ENTRY();
1030 
1032  &M0_XCODE_OBJ(m0_cas_op_xc, cas_op(fom0)));
1033  if (rc < 0)
1034  return M0_ERR(rc);
1035 
1036  M0_ASSERT(rc > 0);
1037 
1039  &cas_op(fom0)->cg_txd,
1040  &((struct m0_buf) { .b_nob = rc }),
1041  m0_fom_reqh(fom0)->rh_beseg,
1042  NULL, &cred);
1043  m0_be_tx_credit_add(&fom0->fo_tx.tx_betx_cred, &cred);
1044 
1045  return M0_RC(0);
1046 }
1047 
1048 static int cas_dtm0_logrec_add(struct m0_fom *fom0,
1049  enum m0_dtm0_tx_pa_state state)
1050 {
1051  /* log the dtm0 logrec before completing the cas op */
1052  struct m0_dtm0_service *dtms =
1054  struct m0_dtm0_tx_desc *msg = &cas_op(fom0)->cg_txd;
1055  struct m0_buf buf = {};
1056  int i;
1057  int rc;
1058 
1059  for (i = 0; i < msg->dtd_ps.dtp_nr; ++i) {
1060  if (m0_fid_eq(&msg->dtd_ps.dtp_pa[i].p_fid,
1061  &dtms->dos_generic.rs_service_fid)) {
1062  msg->dtd_ps.dtp_pa[i].p_state = state;
1063  break;
1064  }
1065  }
1067  &buf.b_addr, &buf.b_nob) ?:
1068  m0_dtm0_logrec_update(dtms->dos_log, &fom0->fo_tx.tx_betx, msg,
1069  &buf);
1070  m0_buf_free(&buf);
1071 
1072  return rc;
1073 }
1074 
1075 static void cas_fom_success(struct cas_fom *fom, enum m0_cas_opcode opc)
1076 {
1077  cas_fom_cleanup(fom, opc == CO_CUR);
1079 }
1080 
1081 static void addb2_add_kv_attrs(const struct cas_fom *fom, enum stats_kv_io kv_io)
1082 {
1083  int i;
1084  int j;
1085  uint64_t sm_id = m0_sm_id_get(&fom->cf_fom.fo_sm_phase);
1086  static int kv_stats_labels[STATS_KV_NR][STATS_KV_IO_NR][STATS_NR] =
1087  {
1088  {
1089  {
1093  },
1094  {
1098  },
1099  },
1100  {
1101  {
1105  },
1106  {
1110  },
1111  }
1112  };
1113 
1114  for (i = 0; i < STATS_KV_NR; i++) {
1115  for (j = 0; j < STATS_NR; j++)
1116  M0_ADDB2_ADD(M0_AVI_ATTR, sm_id,
1117  kv_stats_labels[i][kv_io][j],
1118  fom->cf_kv_stats[i][kv_io][j]);
1119  }
1120 }
1121 
1129 static bool op_is_index_drop(enum m0_cas_opcode opc,
1130  enum m0_cas_type ct)
1131 {
1132  return CTG_OP_COMBINE(CO_DEL, CT_META) == CTG_OP_COMBINE(opc, ct);
1133 }
1134 
1135 /*
1136  * Wait for the BE transaction to be persisted.
1137  */
1138 static int op_sync_wait(struct m0_fom *fom)
1139 {
1140  struct m0_be_tx *tx;
1141 
1142  tx = m0_fom_tx(fom);
1143  if (fom->fo_tx.tx_state != M0_DTX_INVALID) {
1144  /*
1145  * The above checking of 'fom tx' state must go first
1146  * before any other access or checking on this fom tx,
1147  * because if the fom tx is not initialized, any access
1148  * of this fom tx is not safe or useful.
1149  */
1150 
1151  if (m0_be_tx_state(tx) < M0_BTS_LOGGED) {
1152  M0_LOG(M0_DEBUG, "fom wait for tx to be logged");
1153  m0_fom_wait_on(fom, &tx->t_sm.sm_chan, &fom->fo_cb);
1154  return M0_FSO_WAIT;
1155  }
1156  }
1157  return M0_FSO_AGAIN;
1158 }
1159 
1160 static int cas_fom_tick(struct m0_fom *fom0)
1161 {
1162  uint64_t i;
1163  int rc;
1164  int result = M0_FSO_AGAIN;
1165  struct cas_fom *fom = M0_AMB(fom, fom0, cf_fom);
1166  int phase = m0_fom_phase(fom0);
1167  struct m0_cas_op *op = cas_op(fom0);
1168  struct m0_cas_rep *rep = m0_fop_data(fom0->fo_rep_fop);
1169  enum m0_cas_opcode opc = m0_cas_opcode(fom0->fo_fop);
1170  enum m0_cas_type ct = cas_type(fom0);
1171  bool is_meta = ct == CT_META;
1172  struct m0_cas_ctg *ctg = fom->cf_ctg;
1173  size_t ipos = fom->cf_ipos;
1174  struct m0_cas_id *icid = &fom->cf_in_cids[ipos];
1175  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
1176  struct cas_service *service = M0_AMB(service,
1177  fom0->fo_service, c_service);
1178  struct m0_cas_ctg *meta = m0_ctg_meta();
1179  struct m0_cas_ctg *ctidx = m0_ctg_ctidx();
1180  struct m0_cas_rec *rec = NULL;
1181  bool is_dtm0_used = ENABLE_DTM0 &&
1182  !m0_dtm0_tx_desc_is_none(&op->cg_txd);
1183  bool is_index_drop;
1184  bool do_ctidx;
1185  int next_phase;
1186 
1187  M0_ENTRY("fom %p phase %d (%s) op_flag=0x%x", fom, phase,
1188  m0_fom_phase_name(fom0, phase), op->cg_flags);
1189 
1190  M0_PRE(ctidx != NULL);
1192  M0_PRE(ergo(is_dtm0_used, m0_dtm0_tx_desc__invariant(&op->cg_txd)));
1193  /*
1194  * If COF_NO_DTM is set, no DTM is needed for this operation.
1195  * CAS service may take more special actions based on this flag.
1196  */
1197  M0_PRE(ergo(op->cg_flags & COF_NO_DTM,
1198  m0_dtm0_tx_desc_is_none(&op->cg_txd)));
1199 
1200  if (!M0_IS0(&op->cg_txd) && phase == M0_FOPH_INIT)
1201  M0_LOG(M0_DEBUG, "Got CAS with txid: " DTID0_F,
1202  DTID0_P(&op->cg_txd.dtd_id));
1203 
1204  is_index_drop = op_is_index_drop(opc, ct);
1205 
1206  switch (phase) {
1207  case M0_FOPH_INIT ... M0_FOPH_NR - 1:
1208 
1209  if (phase == M0_FOPH_INIT && !fom->cf_op_checked) {
1211  break;
1212  }
1213  if (phase == M0_FOPH_TXN_COMMIT) {
1214  /* Piggyback some information about the transaction */
1215  if (M0_IN(opc, (CO_PUT, CO_DEL)))
1216  m0_fom_mod_rep_fill(&rep->cgr_mod_rep, fom0);
1217  }
1218  if (phase == M0_FOPH_FAILURE) {
1219  /*
1220  * Cleanup locks, etc. in case a failure happened in a
1221  * generic phase. When a failure happens in our phase,
1222  * cas_fom_failure() is called explicitly.
1223  */
1224  cas_fom_cleanup(fom, false);
1225  }
1226  result = m0_fom_tick_generic(fom0);
1227  if (m0_fom_phase(fom0) == M0_FOPH_TXN_OPEN) {
1228  M0_ASSERT(phase == M0_FOPH_TXN_INIT);
1229  m0_fom_phase_set(fom0, CAS_START);
1230  }
1231  if (phase == M0_FOPH_TXN_COMMIT) {
1232  if (op->cg_flags & COF_SYNC_WAIT)
1233  result = op_sync_wait(fom0);
1234  }
1235  if (cas_in_ut() && m0_fom_phase(fom0) == M0_FOPH_QUEUE_REPLY) {
1237  }
1238 
1239  /*
1240  * Once the transition TXN_DONE_WAIT->FINISH is completed,
1241  * we need to send out P-msg if DTM0 is in use.
1242  */
1243  if (phase == M0_FOPH_TXN_DONE_WAIT &&
1244  m0_fom_phase(fom0) == M0_FOPH_FINISH && is_dtm0_used) {
1245  rc = m0_dtm0_on_committed(fom0, &cas_op(fom0)->cg_txd.dtd_id);
1246  if (rc != 0)
1247  M0_LOG(M0_WARN, "Could not send PERSISTENT "
1248  "messages out");
1249  }
1250  break;
1251  case CAS_CHECK_PRE:
1252  rc = cas_id_check(&op->cg_id);
1253  if (rc == 0) {
1254  if (cas_fid_is_cctg(&op->cg_id.ci_fid))
1255  result = cas_ctidx_lookup(fom, &op->cg_id,
1256  CAS_CHECK);
1257  else
1258  m0_fom_phase_set(fom0, CAS_CHECK);
1259  } else
1261  break;
1262  case CAS_CHECK:
1263  rc = cas_op_check(op, fom, is_index_drop);
1264  if (rc == 0) {
1265  fom->cf_op_checked = true;
1267  m0_sm_id_get(&fom0->fo_sm_phase),
1269  fom->cf_ikv_nr);
1271  } else
1273  break;
1274  case CAS_START:
1275  if (is_meta) {
1276  /*
1277  * Assign meta to ctg to re-use CAS_LOCK state.
1278  * If this is write op on meta, will need W lock.
1279  */
1280  fom->cf_ctg = meta;
1282  } else
1284  break;
1285  case CAS_META_LOCK:
1286  result = m0_long_read_lock(m0_ctg_lock(meta),
1287  &fom->cf_meta,
1288  CAS_META_LOOKUP);
1289  result = M0_FOM_LONG_LOCK_RETURN(result);
1290  break;
1291  case CAS_META_LOOKUP:
1292  m0_ctg_op_init(&fom->cf_ctg_op, fom0, 0);
1293  result = m0_ctg_meta_lookup(ctg_op,
1294  &op->cg_id.ci_fid,
1296  break;
1297  case CAS_META_LOOKUP_DONE:
1298  rc = m0_ctg_op_rc(ctg_op);
1299  if (rc == 0) {
1300  fom->cf_ctg = m0_ctg_meta_lookup_result(ctg_op);
1301  M0_ASSERT(fom->cf_ctg != NULL);
1303  } else if (rc == -ENOENT && opc == CO_PUT &&
1304  (op->cg_flags & COF_CROW)) {
1305  m0_long_unlock(m0_ctg_lock(meta), &fom->cf_meta);
1306  rc = cas_ctg_crow_handle(fom, &op->cg_id);
1307  if (rc == 0) {
1309  result = M0_FSO_WAIT;
1310  }
1311  }
1312  m0_ctg_op_fini(ctg_op);
1313  if (rc != 0)
1314  cas_fom_failure(fom, rc, false);
1315  break;
1316  case CAS_CTG_CROW_DONE:
1317  /*
1318  * EEXIST is treated as success as desired index may be created
1319  * by some other fom after the index lookup resulted ENOENT and
1320  * before the index creation attempt initiated by current fom
1321  * when CROW is used.
1322  *
1323  * Example: simultaneous put of keys key1 and key2 into the same
1324  * index index1, index1 is empty on this node and not yet
1325  * phisically created as CROW is used for distributed indices,
1326  * keys are sent in a separate fops fop1 and fop2:
1327  *
1328  * 1.1. put fop1 recevied -> fom1
1329  * 1.2. put fop2 recevied -> fom2
1330  * 2.1. fom1 -> lookup index1 -> ENOENT
1331  * 2.2. fom2 -> lookup index1 -> ENOENT
1332  * 3.1. fom1 -> run fom1.1 to create index1
1333  * 3.2. fom2 -> run fom2.1 to create index1
1334  * 4.1. fom1.1 -> create index1 -> SUCCESS
1335  * 4.2. fom2.1 -> create index1 -> EEXIST
1336  * 5.1. fom1.1 -> SUCCESS -> fom1
1337  * 5.2. fom2.1 -> EEXIST -> fom2
1338  * 6.1. fom1 -> lookup index1 -> SUCCESS
1339  * 6.2. fom2 -> send EEXIST
1340  * 7. fom1 -> insert key1 into index1 -> send SUCCESS
1341  */
1342  if (fom->cf_thrall_rc == 0 || fom->cf_thrall_rc == -EEXIST)
1343  m0_fom_phase_set(fom0, CAS_START);
1344  else
1345  cas_fom_failure(fom, fom->cf_thrall_rc, false);
1346  break;
1347  case CAS_LOAD_KEY:
1348  result = cas_at_load(&cas_at(op, ipos)->cr_key, fom0,
1349  CAS_LOAD_VAL);
1350  break;
1351  case CAS_LOAD_VAL:
1352  /*
1353  * Don't check result of key loading here, result codes for all
1354  * keys/values are checked in cas_load_check().
1355  */
1356  result = cas_at_load(&cas_at(op, ipos)->cr_val, fom0,
1357  CAS_LOAD_DONE);
1358  break;
1359  case CAS_LOAD_DONE:
1360  /* Record key/value are loaded. */
1361  fom->cf_ipos++;
1362  M0_ASSERT(fom->cf_ipos <= op->cg_rec.cr_nr);
1363  /* Do we need to load other keys and values from op? */
1364  if (fom->cf_ipos == op->cg_rec.cr_nr) {
1365  fom->cf_ipos = 0;
1367  if (rc != 0)
1368  cas_fom_failure(fom, M0_ERR(rc), false);
1369  else {
1371  result = cas_kv_load_done(fom, opc, op,
1372  is_index_drop ?
1374  CAS_LOCK);
1375  }
1376  break;
1377  }
1378  /* Load next key/value. */
1380  break;
1381  case CAS_LOCK:
1382  M0_ASSERT(ctg != NULL);
1383  /*
1384  * In case of index drop use cf_meta lock: we need cf_lock to
1385  * lock index.
1386  */
1387  result = m0_long_lock(m0_ctg_lock(ctg),
1388  !cas_is_ro(opc),
1389  is_index_drop ? &fom->cf_meta :
1390  &fom->cf_lock,
1391  is_meta ? CAS_CTIDX_LOCK : CAS_PREP);
1392  result = M0_FOM_LONG_LOCK_RETURN(result);
1393  fom->cf_ipos = 0;
1394  break;
1395  case CAS_CTIDX_LOCK:
1396  result = m0_long_lock(m0_ctg_lock(m0_ctg_ctidx()), !cas_is_ro(opc),
1397  &fom->cf_ctidx, CAS_PREP);
1398  result = M0_FOM_LONG_LOCK_RETURN(result);
1399  break;
1400  case CAS_DEAD_INDEX_LOCK:
1402  &fom->cf_dead_index, CAS_LOCK);
1403  result = M0_FOM_LONG_LOCK_RETURN(result);
1404  break;
1405  case CAS_PREP:
1406  M0_ASSERT(m0_forall(i, rep->cgr_rep.cr_nr,
1407  rep->cgr_rep.cr_rec[i].cr_rc == 0));
1408 
1409  rc = is_meta ? 0 : cas_op_recs_check(fom, opc, ct, op);
1410  if (rc != 0) {
1411  cas_fom_failure(fom, M0_ERR(rc), false);
1412  break;
1413  }
1414 
1415  for (i = 0; i < op->cg_rec.cr_nr; i++)
1416  cas_prep(fom, opc, ct, ctg, i,
1417  &fom0->fo_tx.tx_betx_cred);
1418  fom->cf_ipos = 0;
1419  fom->cf_opos = 0;
1420  if (opc == CO_CUR) {
1421  /*
1422  * There is only one catalogue operation context for
1423  * cursor operation.
1424  */
1425  m0_ctg_op_init(&fom->cf_ctg_op, fom0,
1426  cas_op(fom0)->cg_flags);
1427  }
1428 
1429  /*
1430  * If dtm0 is used we need to calculate credits for creating
1431  * a dtm0 log record.
1432  */
1433  if (is_dtm0_used) {
1435  if (rc != 0) {
1436  cas_fom_failure(fom, M0_ERR(rc), false);
1437  break;
1438  }
1439  }
1440 
1442  /*
1443  * @todo waiting for transaction open with btree (which can be
1444  * the meta-catalogue) locked, because tree height has to be
1445  * fixed for the correct credit calculation.
1446  */
1447  break;
1448  case CAS_TXN_OPENED:
1449  m0_fom_phase_set(fom0, is_meta ? CAS_LOOP : CAS_META_UNLOCK);
1450  break;
1451  case CAS_META_UNLOCK:
1452  M0_ASSERT(!is_meta);
1453  m0_long_read_unlock(m0_ctg_lock(meta), &fom->cf_meta);
1454  m0_fom_phase_set(fom0, CAS_LOOP);
1455  break;
1456  case CAS_LOOP:
1457  /* Skip empty CUR requests. */
1458  while (opc == CO_CUR && ipos < op->cg_rec.cr_nr &&
1459  cas_at(op, ipos)->cr_rc == 0)
1460  fom->cf_ipos = ++ipos;
1461  /* If all input has been processed... */
1462  if (ipos == op->cg_rec.cr_nr ||
1463  /* ... or all output has been generated. */
1464  fom->cf_opos == rep->cgr_rep.cr_nr) {
1465  /*
1466  * Check reply payload size against max RPC item payload
1467  * size.
1468  */
1470  cas_fom_failure(fom, M0_ERR(-E2BIG),
1471  opc == CO_CUR);
1472  else {
1473  if (is_dtm0_used)
1474  m0_fom_phase_set(fom0, CAS_DTM0);
1475  else
1476  cas_fom_success(fom, opc);
1477  }
1479  } else {
1480  do_ctidx = cas_ctidx_op_needed(fom, opc, ct, ipos);
1481  result = cas_exec(fom, opc, ct, ctg, ipos,
1482  is_index_drop ?
1484  do_ctidx ? CAS_CTIDX :
1486  }
1487  break;
1488  /*
1489  * Here are states specific to catalogue-index.
1490  */
1491  case CAS_CTIDX:
1492  M0_ASSERT(fom->cf_opos < rep->cgr_rep.cr_nr);
1493  rec = cas_out_at(rep, fom->cf_opos);
1494  M0_ASSERT(rec != NULL);
1495  if (rec->cr_rc == 0 && m0_ctg_op_rc(ctg_op) == 0) {
1496  m0_ctg_op_fini(ctg_op);
1497  m0_fom_phase_set(fom0,
1498  opc == CO_PUT ? CAS_CTIDX_MEM_PLACE :
1500  } else
1502  break;
1503  case CAS_CTIDX_MEM_PLACE:
1504  result = cas_ctidx_mem_place(fom, icid, CAS_CTIDX_INSERT);
1505  break;
1506  case CAS_CTIDX_INSERT:
1507  result = cas_ctidx_insert(fom, icid, is_index_drop ?
1508  CAS_IDROP_LOOP :
1510  break;
1511  case CAS_CTIDX_LOOKUP:
1512  result = cas_ctidx_lookup(fom, icid, CAS_CTIDX_MEM_FREE);
1513  break;
1514  case CAS_CTIDX_MEM_FREE:
1516  break;
1517  case CAS_CTIDX_DELETE:
1518  result = cas_ctidx_delete(fom, icid,
1519  is_index_drop ? CAS_IDROP_LOOP : CAS_PREPARE_SEND);
1520  break;
1521 
1522  /*
1523  * Here are states specific to index drop.
1524  */
1525  case CAS_INSERT_TO_DEAD:
1526  /*
1527  * We are here if this is index drop.
1528  * Now insert it into dead_index to process actual tree delete
1529  * by background garbage collector.
1530  */
1531  rc = m0_ctg_op_rc(ctg_op);
1532  if (rc == 0) {
1533  /*
1534  * Just completed meta lookup for this fid.
1535  */
1536  fom->cf_ctg = m0_ctg_meta_lookup_result(ctg_op);
1537  /*
1538  * Meta stores pointers to catalogues.
1539  * We will need catalogue later to lock it.
1540  */
1541  fom->cf_moved_ctgs[ipos] = fom->cf_ctg;
1542  /*
1543  * Insert to dead index, then can remove from meta.
1544  */
1545  m0_ctg_op_fini(ctg_op);
1546  m0_ctg_op_init(&fom->cf_ctg_op, fom0, 0);
1547  M0_LOG(M0_DEBUG, "Insert to dead ctg %p", fom->cf_ctg);
1548  m0_ctg_dead_index_insert(ctg_op,
1549  fom->cf_ctg,
1551  /*
1552  * The catalogue (cf_ctg) could be finalized anytime
1553  * after this point by the garbage collector (cgc fom).
1554  * Setting it to NULL below prevents any invalid memory
1555  * access during cas fom cleanup later.
1556  */
1557  m0_long_unlock(m0_ctg_lock(fom->cf_ctg),
1558  &fom->cf_lock);
1559  fom->cf_ctg = NULL;
1560  } else {
1561  M0_LOG(M0_DEBUG, "lookup in meta failed");
1562  fom->cf_moved_ctgs[ipos] = NULL;
1564  }
1565  break;
1566  case CAS_DELETE_FROM_META:
1567  rc = m0_ctg_op_rc(ctg_op);
1568  if (rc == 0) {
1569  m0_ctg_op_fini(ctg_op);
1570  m0_ctg_op_init(&fom->cf_ctg_op, fom0, 0);
1571  do_ctidx = cas_ctidx_op_needed(fom, opc, ct, ipos);
1572  m0_ctg_meta_delete(ctg_op,
1573  &fom->cf_in_cids[ipos].ci_fid,
1574  do_ctidx ? CAS_CTIDX :
1575  CAS_IDROP_LOOP);
1576  } else {
1577  M0_LOG(M0_DEBUG, "insert to meta failed");
1579  }
1580  break;
1581  case CAS_IDROP_LOOP:
1582  rc = m0_ctg_op_rc(ctg_op);
1583  fom->cf_ipos = ++ipos;
1584  if (ipos < op->cg_rec.cr_nr)
1585  m0_fom_phase_set(fom0, CAS_LOOP);
1586  else {
1587  m0_ctg_op_fini(ctg_op);
1588  /*
1589  * Moved all records from meta to
1590  * dead_index. Now can unlock meta catalogues and
1591  * wait until dropping indices are not used.
1592  */
1593  m0_long_unlock(m0_ctg_lock(meta), &fom->cf_meta);
1594  m0_long_unlock(m0_ctg_lock(ctidx), &fom->cf_ctidx);
1595  fom->cf_ipos = 0;
1596  m0_fom_phase_set(fom0, rc == 0 ? CAS_IDROP_LOCK_LOOP :
1598  }
1599  break;
1600  case CAS_IDROP_LOCK_LOOP:
1601  if (ipos == op->cg_rec.cr_nr) {
1602  /*
1603  * Unlock dead index, so index garbage collector (if
1604  * running) can safely proceed with newly added indices
1605  * in "dead index" catalogue.
1606  */
1608  &fom->cf_dead_index);
1610  } else if (fom->cf_moved_ctgs[ipos] != NULL) {
1611  /*
1612  * Actually we do not need this ctg_op, but in CAS_DONE
1613  * state we free it. Let's do not change it.
1614  */
1615  m0_ctg_op_init(&fom->cf_ctg_op, fom0, 0);
1616  /*
1617  * Lock dropping index to be sure nobody keep
1618  * using it.
1619  */
1620  result = m0_long_write_lock(
1621  m0_ctg_lock(fom->cf_moved_ctgs[ipos]),
1622  &fom->cf_lock,
1624  result = M0_FOM_LONG_LOCK_RETURN(result);
1625  } else
1626  m0_fom_phase_set(fom0, CAS_LOOP);
1627  break;
1628  case CAS_IDROP_LOCKED:
1629  /*
1630  * Now can unlock: index is not visible any more and nobody use
1631  * it for sure.
1632  */
1633  m0_long_unlock(m0_ctg_lock(fom->cf_moved_ctgs[ipos]),
1634  &fom->cf_lock);
1636  break;
1637  case CAS_IDROP_START_GC:
1638  /* Start garbage collector, if it is not already running. */
1639  m0_cas_gc_start(&service->c_service);
1640  cas_fom_success(fom, opc);
1641  break;
1642  /*
1643  * End of states specific to index drop.
1644  */
1645 
1646  case CAS_PREPARE_SEND:
1647  next_phase = CAS_DONE;
1648  M0_ASSERT(fom->cf_opos < rep->cgr_rep.cr_nr);
1649  rec = cas_out_at(rep, fom->cf_opos);
1650  M0_ASSERT(rec != NULL);
1651  m0_ctg_op_get_ver(ctg_op,
1652  &cas_out_at(rep, fom->cf_opos)->cr_ver);
1653  if (rec->cr_rc == 0) {
1654  rec->cr_rc = m0_ctg_op_rc(ctg_op);
1655  if (rec->cr_rc == 0) {
1656  if (cas_key_need_to_send(fom, opc, ct, op,
1657  ipos)){
1658  rec->cr_rc =
1659  cas_prep_send(fom, opc, ct);
1660  if (rec->cr_rc == 0)
1661  next_phase = CAS_SEND_KEY;
1662  } else {
1663  if (opc == CO_CUR)
1664  fom->cf_curpos++;
1665  next_phase = CAS_LOOP;
1666  }
1667  }
1668  }
1669  m0_fom_phase_set(fom0, next_phase);
1670  break;
1671  case CAS_SEND_KEY:
1672  if (opc == CO_CUR)
1673  result = cas_key_send(fom, op, opc, rep, CAS_KEY_SENT);
1674  else
1676  break;
1677  case CAS_KEY_SENT:
1678  rec = cas_out_at(rep, fom->cf_opos);
1679  rec->cr_rc = m0_rpc_at_reply_rc(&rec->cr_key);
1680  /*
1681  * Try to send value even if a key is not sent successfully.
1682  * It's necessary to return proper reply in case if bulk
1683  * transmission is required, but the user sent empty AT buffer.
1684  */
1686  break;
1687  case CAS_SEND_VAL:
1688  if (ct == CT_BTREE && M0_IN(opc, (CO_GET, CO_CUR)))
1689  result = cas_val_send(fom, op, opc, rep, CAS_VAL_SENT);
1690  else
1691  m0_fom_phase_set(fom0, CAS_DONE);
1692  break;
1693  case CAS_VAL_SENT:
1694  rec = cas_out_at(rep, fom->cf_opos);
1695  rec->cr_rc = m0_rpc_at_reply_rc(&rec->cr_val);
1696  m0_fom_phase_set(fom0, CAS_DONE);
1697  break;
1698  case CAS_DONE:
1699  if (cas_done(fom, op, rep, opc) == 0 && is_index_drop)
1701  else
1702  m0_fom_phase_set(fom0, CAS_LOOP);
1703  break;
1704  case CAS_DTM0:
1705  rc = cas_dtm0_logrec_add(&fom->cf_fom,
1707  if (rc != 0)
1708  cas_fom_failure(fom, M0_ERR(rc), opc == CO_CUR);
1709  else
1710  cas_fom_success(fom, opc);
1711  break;
1712  default:
1713  M0_IMPOSSIBLE("Invalid phase");
1714  }
1715 
1717  return M0_RC(result);
1718 }
1719 
1720 M0_INTERNAL void (*cas__ut_cb_done)(struct m0_fom *fom);
1721 M0_INTERNAL void (*cas__ut_cb_fini)(struct m0_fom *fom);
1722 
1723 static void cas_fom_fini(struct m0_fom *fom0)
1724 {
1725  struct m0_cas_rec *rec;
1726  struct m0_cas_op *op = cas_op(fom0);
1727  struct cas_fom *fom = M0_AMB(fom, fom0, cf_fom);
1728  uint64_t i;
1729 
1730  for (i = 0; i < op->cg_rec.cr_nr; i++) {
1731  rec = cas_at(op, i);
1732 
1733  /* Finalise input AT buffers. */
1734  cas_at_fini(&rec->cr_key);
1735  cas_at_fini(&rec->cr_val);
1736  }
1737 
1738  if (cas_in_ut() && cas__ut_cb_done != NULL)
1739  cas__ut_cb_done(fom0);
1740 
1741  for (i = 0; i < fom->cf_in_cids_nr; i++)
1742  m0_cas_id_fini(&fom->cf_in_cids[i]);
1743  m0_free(fom->cf_in_cids);
1744  m0_free(fom->cf_moved_ctgs);
1745  m0_free(fom->cf_ikv);
1746  m0_long_lock_link_fini(&fom->cf_meta);
1747  m0_long_lock_link_fini(&fom->cf_lock);
1748  m0_long_lock_link_fini(&fom->cf_ctidx);
1749  m0_long_lock_link_fini(&fom->cf_dead_index);
1750  m0_long_lock_link_fini(&fom->cf_del_lock);
1751  m0_fom_fini(fom0);
1752  m0_free(fom);
1753  if (cas_in_ut() && cas__ut_cb_fini != NULL)
1754  cas__ut_cb_fini(fom0);
1755 }
1756 
1757 static const struct m0_fid *cas_fid(const struct m0_fom *fom)
1758 {
1759  return &cas_op(fom)->cg_id.ci_fid;
1760 }
1761 
1762 static size_t cas_fom_home_locality(const struct m0_fom *fom)
1763 {
1764  static uint64_t loc = 0;
1765 
1766  return loc++;
1767 }
1768 
1769 static struct m0_cas_op *cas_op(const struct m0_fom *fom)
1770 {
1771  return m0_fop_data(fom->fo_fop);
1772 }
1773 
1774 static enum m0_cas_opcode m0_cas_opcode(const struct m0_fop *fop)
1775 {
1776  enum m0_cas_opcode opcode;
1777 
1779  M0_ASSERT(0 <= opcode && opcode < CO_NR);
1780  return opcode;
1781 }
1782 
1783 static int cas_sdev_state(struct m0_poolmach *pm,
1784  uint32_t sdev_idx,
1785  enum m0_pool_nd_state *state_out)
1786 {
1787  int i;
1788 
1789  if (M0_FI_ENABLED("sdev_fail")) {
1790  *state_out = M0_PNDS_FAILED;
1791  return 0;
1792  }
1793  for (i = 0; i < pm->pm_state->pst_nr_devices; i++) {
1794  struct m0_pooldev *sdev = &pm->pm_state->pst_devices_array[i];
1795 
1796  if (sdev->pd_sdev_idx == sdev_idx) {
1797  *state_out = sdev->pd_state;
1798  return 0;
1799  }
1800  }
1801  return M0_ERR(-EINVAL);
1802 }
1803 
1811 static int cas_device_check(const struct cas_fom *fom,
1812  const struct m0_cas_id *cid)
1813 {
1814  uint32_t device_id;
1815  enum m0_pool_nd_state state;
1816  struct m0_pool_version *pver;
1817  struct m0_poolmach *pm;
1818  struct cas_service *svc = M0_AMB(svc, fom->cf_fom.fo_service,
1819  c_service);
1820  struct m0_pools_common *pc = svc->c_service.rs_reqh->rh_pools;
1821  int rc = 0;
1822 
1823  /*
1824  * For some unit tests it's ok when pools is NULL, skip checking of
1825  * device in this case.
1826  */
1827  if (cas_fid_is_cctg(&cid->ci_fid) && (!cas_in_ut() || pc != NULL)) {
1828  device_id = m0_dix_fid_cctg_device_id(&cid->ci_fid);
1830  &cid->ci_layout.u.dl_desc.ld_pver);
1831  if (pver != NULL) {
1832  pm = &pver->pv_mach;
1833  rc = cas_sdev_state(pm, device_id, &state);
1834  if (rc == 0 && !M0_IN(state, (M0_PNDS_ONLINE,
1836  rc = M0_ERR(-EBADFD);
1837  } else
1838  rc = M0_ERR(-EINVAL);
1839  }
1840  return M0_RC(rc);
1841 }
1842 
1843 static int cas_id_check(const struct m0_cas_id *cid)
1844 {
1845  const struct m0_dix_layout *layout;
1846  int rc = 0;
1847 
1848  if (!m0_fid_is_valid(&cid->ci_fid) ||
1850  &m0_cctg_fid_type)))
1851  rc = M0_ERR(-EPROTO);
1852 
1853  if (rc == 0 && cas_fid_is_cctg(&cid->ci_fid)) {
1854  layout = &cid->ci_layout;
1855  if (layout->dl_type != DIX_LTYPE_DESCR)
1856  rc = M0_ERR(-EPROTO);
1857  }
1858  return rc;
1859 }
1860 
1861 static int cas_op_check(struct m0_cas_op *op,
1862  struct cas_fom *fom,
1863  bool is_index_drop)
1864 {
1865  struct m0_fom *fom0 = &fom->cf_fom;
1866  enum m0_cas_opcode opc = m0_cas_opcode(fom0->fo_fop);
1867  enum m0_cas_type ct = cas_type(fom0);
1868  bool is_meta = ct == CT_META;
1869  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
1870  const struct m0_cas_id *cid = &op->cg_id;
1871  uint32_t flags = cas_op(fom0)->cg_flags;
1872  struct m0_buf buf;
1873  const struct m0_dix_layout *layout;
1874  struct m0_dix_layout *stored_layout;
1875  int rc = 0;
1876 
1877  if (cas_fid_is_cctg(&cid->ci_fid)) {
1878  rc = m0_ctg_op_rc(ctg_op);
1879  if (rc == 0) {
1880  m0_ctg_lookup_result(ctg_op, &buf);
1881  layout = &cid->ci_layout;
1882  stored_layout = (struct m0_dix_layout *)buf.b_addr;
1883  /* Match stored and received layouts. */
1884  if (!m0_dix_layout_eq(layout, stored_layout))
1885  rc = M0_ERR(-EKEYEXPIRED);
1886  } else if (rc == -ENOENT && (flags & COF_CROW))
1887  /*
1888  * It's OK to not find layout with CROW flag set,
1889  * because the catalogue may be not created yet.
1890  */
1891  rc = 0;
1892  m0_ctg_op_fini(ctg_op);
1893  }
1894 
1895  if (rc == 0) {
1896  rc = cas_device_check(fom, &op->cg_id);
1897  if (rc == 0 && is_meta && fom->cf_ikv_nr != 0) {
1898  M0_ALLOC_ARR(fom->cf_in_cids, fom->cf_ikv_nr);
1899  if (fom->cf_in_cids == NULL)
1900  rc = -ENOMEM;
1901  if (is_index_drop) {
1902  /*
1903  * Store here pointers to records in dead_index
1904  * to be able to lock dropped indices. Need
1905  * to store fom->cf_ikv_nr records (== number of
1906  * keys to delete).
1907  */
1908  M0_ALLOC_ARR(fom->cf_moved_ctgs,
1909  fom->cf_ikv_nr);
1910  if (fom->cf_moved_ctgs == NULL)
1911  rc = -ENOMEM;
1912  }
1913  }
1914  }
1915  if (rc == 0)
1916  /*
1917  * Note: fill cf_in_cids there.
1918  */
1919  rc = cas_op_recs_check(fom, opc, ct, op);
1920 
1921  return M0_RC(rc);
1922 }
1923 
1924 static bool cas_is_valid(struct cas_fom *fom, enum m0_cas_opcode opc,
1925  enum m0_cas_type ct, const struct m0_cas_rec *rec,
1926  uint64_t rec_pos)
1927 {
1928  bool result;
1929  bool gotkey;
1930  bool gotval;
1931  bool meta = ct == CT_META;
1932 
1933  gotkey = m0_rpc_at_is_set(&rec->cr_key);
1934  gotval = m0_rpc_at_is_set(&rec->cr_val);
1935  switch (opc) {
1936  case CO_GET:
1937  case CO_DEL:
1938  result = gotkey && !gotval && rec->cr_rc == 0;
1939  break;
1940  case CO_PUT:
1941  result = gotkey && (gotval == !meta) && rec->cr_rc == 0;
1942  break;
1943  case CO_CUR:
1944  result = gotkey && !gotval;
1945  break;
1946  case CO_REP:
1947  result = !gotval == (((int64_t)rec->cr_rc) < 0 || meta);
1948  break;
1949  case CO_GC:
1950  case CO_MIN:
1951  case CO_TRUNC:
1952  case CO_DROP:
1953  result = true;
1954  break;
1955  default:
1956  M0_IMPOSSIBLE("Wrong opcode.");
1957  }
1958  if (meta && gotkey && result) {
1959  int rc;
1960  struct m0_cas_id cid = {};
1961  struct m0_buf key;
1962  const struct m0_dix_imask *imask;
1963 
1964  /*
1965  * Valid key is sent inline always, so result
1966  * should be 0.
1967  * Key is encoded m0_cas_id in meta case.
1968  */
1969  rc = m0_rpc_at_get(&rec->cr_key, &key) ?:
1970  cas_buf_cid_decode(&key, &cid);
1971  if (rc == 0) {
1972  imask = &cid.ci_layout.u.dl_desc.ld_imask;
1973 
1974  result = m0_fid_is_valid(&cid.ci_fid) &&
1975  M0_IN(m0_fid_type_getfid(&cid.ci_fid),
1977  &m0_cctg_fid_type));
1978  if (result) {
1979  if (cas_fid_is_cctg(&cid.ci_fid))
1980  result = (imask->im_range == NULL) ==
1981  (imask->im_nr == 0) &&
1982  cas_device_check(fom, &cid) == 0;
1983  else
1984  result = (imask->im_range == NULL &&
1985  imask->im_nr == 0);
1986  }
1987  } else
1988  result = false;
1989 
1990  if (result) {
1991  fom->cf_in_cids[rec_pos] = cid;
1992  fom->cf_in_cids_nr++;
1993  }
1994  }
1995  return M0_RC(result);
1996 }
1997 
1998 static bool cas_is_ro(enum m0_cas_opcode opc)
1999 {
2000  return M0_IN(opc, (CO_GET, CO_CUR, CO_REP));
2001 }
2002 
2003 static enum m0_cas_type cas_type(const struct m0_fom *fom)
2004 {
2006  return CT_META;
2007  else
2008  return CT_BTREE;
2009 }
2010 
2011 static uint64_t cas_in_nr(const struct m0_fop *fop)
2012 {
2013  const struct m0_cas_op *op = m0_fop_data(fop);
2014 
2015  return op->cg_rec.cr_nr;
2016 }
2017 
2018 static uint64_t cas_out_nr(const struct m0_fop *fop)
2019 {
2020  const struct m0_cas_op *op = m0_fop_data(fop);
2021  uint64_t nr;
2022 
2023  nr = op->cg_rec.cr_nr;
2024  if (m0_cas_opcode(fop) == CO_CUR)
2025  nr = m0_reduce(i, nr, 0, + op->cg_rec.cr_rec[i].cr_rc);
2026  return nr;
2027 }
2028 
2029 static int cas_buf_cid_decode(struct m0_buf *enc_buf,
2030  struct m0_cas_id *cid)
2031 {
2032  M0_PRE(enc_buf != NULL);
2033  M0_PRE(cid != NULL);
2034 
2035  M0_PRE(M0_IS0(cid));
2036 
2038  &M0_XCODE_OBJ(m0_cas_id_xc, cid),
2039  enc_buf->b_addr, enc_buf->b_nob);
2040 }
2041 
2042 static bool cas_fid_is_cctg(const struct m0_fid *fid)
2043 {
2044  M0_PRE(fid != NULL);
2046 }
2047 
2048 static int cas_place(struct m0_buf *dst, struct m0_buf *src, m0_bcount_t cutoff)
2049 {
2050  int result = 0;
2051 
2052  if (M0_FI_ENABLED("place_fail"))
2053  return M0_ERR(-ENOMEM);
2054 
2055  if (src->b_nob >= cutoff) {
2056  dst->b_addr = m0_alloc_aligned(src->b_nob, m0_pageshift_get());
2057  if (dst->b_addr == NULL)
2058  return M0_ERR(-ENOMEM);
2059  dst->b_nob = src->b_nob;
2060  memcpy(dst->b_addr, src->b_addr, src->b_nob);
2061  } else {
2062  result = m0_buf_copy(dst, src);
2063  }
2064  return M0_RC(result);
2065 }
2066 
2074 static m0_bcount_t cas_kv_nob(const struct m0_buf *inbuf)
2075 {
2076  return inbuf->b_nob + sizeof(uint64_t);
2077 }
2078 
2079 static void cas_prep(struct cas_fom *fom, enum m0_cas_opcode opc,
2080  enum m0_cas_type ct, struct m0_cas_ctg *ctg,
2081  uint64_t rec_pos, struct m0_be_tx_credit *accum)
2082 {
2083  struct m0_fom *fom0 = &fom->cf_fom;
2084  uint32_t flags = cas_op(fom0)->cg_flags;
2085  struct m0_cas_id *cid;
2086  struct m0_buf key;
2087  struct m0_buf val;
2088  m0_bcount_t knob;
2089  m0_bcount_t vnob;
2090 
2091  cas_incoming_kv(fom, rec_pos, &key, &val);
2092  knob = cas_kv_nob(&key);
2093  vnob = cas_kv_nob(&val);
2094  switch (CTG_OP_COMBINE(opc, ct)) {
2095  case CTG_OP_COMBINE(CO_PUT, CT_META):
2096  case CTG_OP_COMBINE(CO_DEL, CT_META):
2097  M0_ASSERT(vnob == sizeof(uint64_t));
2098  cid = &fom->cf_in_cids[rec_pos];
2099  if (cas_fid_is_cctg(&cid->ci_fid)) {
2100  if (opc == CO_PUT)
2101  m0_ctg_ctidx_insert_credits(cid, accum);
2102  else
2103  m0_ctg_ctidx_delete_credits(cid, accum);
2104  }
2105  if (opc == CO_PUT)
2106  m0_ctg_create_credit(accum);
2107  else
2109  break;
2112  if (opc == CO_PUT) {
2113  if (flags & COF_OVERWRITE)
2114  /*
2115  * Consider credits for possible deletion of
2116  * existing key/value to be overwritten, size of
2117  * new value should be enough as current
2118  * key/value are deleted if size of existing
2119  * value is not enough to place new value.
2120  */
2121  m0_ctg_delete_credit(ctg, knob, vnob, accum);
2122  m0_ctg_insert_credit(ctg, knob, vnob, accum);
2123  } else
2124  m0_ctg_delete_credit(ctg, knob, vnob, accum);
2125  break;
2126  }
2127 }
2128 
2129 static struct m0_cas_rec *cas_at(struct m0_cas_op *op, int idx)
2130 {
2131  M0_PRE(0 <= idx && idx < op->cg_rec.cr_nr);
2132  return &op->cg_rec.cr_rec[idx];
2133 }
2134 
2135 static struct m0_cas_rec *cas_out_at(const struct m0_cas_rep *rep, int idx)
2136 {
2137  M0_PRE(0 <= idx && idx < rep->cgr_rep.cr_nr);
2138  return &rep->cgr_rep.cr_rec[idx];
2139 }
2140 
2141 static int cas_kv_load_done(struct cas_fom *fom, enum m0_cas_opcode opc,
2142  const struct m0_cas_op *op, int phase)
2143 
2144 {
2145  if (opc == CO_DEL && (op->cg_flags & COF_DEL_LOCK)) {
2146  /*
2147  * Repair or re-balance may be running, take the lock
2148  * to protect current component record/catalogue that is under
2149  * repair/re-balance from concurrent deletion by the client.
2150  */
2152  m0_ctg_del_lock(),
2153  &fom->cf_del_lock,
2154  phase)));
2155  }
2156 
2157  m0_fom_phase_set(&fom->cf_fom, phase);
2158  return M0_RC(M0_FSO_AGAIN);
2159 }
2160 
2161 static int cas_exec(struct cas_fom *fom, enum m0_cas_opcode opc,
2162  enum m0_cas_type ct, struct m0_cas_ctg *ctg,
2163  uint64_t rec_pos, int next)
2164 {
2165  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2166  struct m0_fom *fom0 = &fom->cf_fom;
2167  uint32_t flags = cas_op(fom0)->cg_flags;
2168  struct m0_buf lbuf = M0_BUF_INIT0;
2169  struct m0_buf kbuf;
2170  struct m0_buf vbuf;
2171  struct m0_cas_id *cid;
2172  struct m0_cas_rec *rec;
2174  M0_ENTRY("opc=%d ct=%d", opc, ct);
2175 
2176  cas_incoming_kv(fom, rec_pos, &kbuf, &vbuf);
2177  if (ct == CT_META)
2178  cid = &fom->cf_in_cids[rec_pos];
2179  else
2180  /*
2181  * Initialise cid to NULL to suppress strange compiler warning.
2182  * Cid is used only for meta operations, but compiler complains
2183  * that it may be uninitialised.
2184  */
2185  cid = NULL;
2186 
2187  if (opc != CO_CUR)
2188  m0_ctg_op_init(&fom->cf_ctg_op, fom0, flags);
2189 
2190  switch (CTG_OP_COMBINE(opc, ct)) {
2192  ret = m0_ctg_lookup(ctg_op, ctg, &kbuf, next);
2193  break;
2195  ret = m0_ctg_insert(ctg_op, ctg, &kbuf, &vbuf, next);
2196  break;
2198  ret = m0_ctg_lookup_delete(ctg_op, ctg, &kbuf, &lbuf, flags, next);
2199  if (ctg_op->co_rc == 0) {
2200  rec = cas_at(cas_op(fom0), rec_pos);
2201 
2202  /*
2203  * Here @lbuf is allocated in m0_ctg_lookup_delete()
2204  * and released in cas_fom_fini().
2205  */
2206  m0_rpc_at_init(&rec->cr_val);
2208  rec->cr_val.u.ab_buf = lbuf;
2209  }
2210  break;
2211  case CTG_OP_COMBINE(CO_DEL, CT_META):
2212  /*
2213  * This is index drop. Move record from meta to dead_index.
2214  * First load record from meta, then insert into dead_index,
2215  * then delete from meta.
2216  * So currnt step is to fetch pointer to ctg from meta.
2217  */
2218  case CTG_OP_COMBINE(CO_GET, CT_META):
2219  ret = m0_ctg_meta_lookup(ctg_op, &cid->ci_fid, next);
2220  break;
2221  case CTG_OP_COMBINE(CO_PUT, CT_META):
2222  ret = m0_ctg_meta_insert(ctg_op, &cid->ci_fid, next);
2223  break;
2224  case CTG_OP_COMBINE(CO_GC, CT_META):
2225  ret = m0_ctg_gc_wait(ctg_op, next);
2226  break;
2228  case CTG_OP_COMBINE(CO_CUR, CT_META):
2229  if (fom->cf_curpos == 0) {
2230  if (!m0_ctg_cursor_is_initialised(ctg_op)) {
2231  if (ct == CT_META)
2232  m0_ctg_meta_cursor_init(ctg_op);
2233  else
2234  m0_ctg_cursor_init(ctg_op, ctg);
2235  }
2236  if (ct == CT_META)
2237  m0_ctg_meta_cursor_get(ctg_op, &cid->ci_fid,
2238  next);
2239  else
2240  m0_ctg_cursor_get(ctg_op, &kbuf, next);
2241  } else if (ct == CT_META)
2242  m0_ctg_meta_cursor_next(ctg_op, next);
2243  else
2244  m0_ctg_cursor_next(ctg_op, next);
2245  break;
2246  }
2247 
2248  return ret;
2249 }
2250 
2251 static bool cas_ctidx_op_needed(struct cas_fom *fom, enum m0_cas_opcode opc,
2252  enum m0_cas_type ct, uint64_t rec_pos)
2253 {
2254  struct m0_cas_id *cid;
2255  bool is_needed = false;
2256 
2257  switch (CTG_OP_COMBINE(opc, ct)) {
2258  case CTG_OP_COMBINE(CO_PUT, CT_META):
2259  case CTG_OP_COMBINE(CO_DEL, CT_META):
2260  cid = &fom->cf_in_cids[rec_pos];
2261  if (cas_fid_is_cctg(&cid->ci_fid))
2262  is_needed = true;
2263  break;
2264  default:
2265  break;
2266  }
2267 
2268  return is_needed;
2269 }
2270 
2271 static int cas_ctidx_mem_place(struct cas_fom *fom,
2272  const struct m0_cas_id *in_cid, int next)
2273 {
2274  const struct m0_dix_imask *imask;
2275  struct m0_fom *fom0 = &fom->cf_fom;
2276  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2278  struct m0_buf buf;
2279 
2280  imask = CID_IMASK_PTR(in_cid);
2281 
2282  if (!m0_dix_imask_is_empty(imask)) {
2283  m0_ctg_op_init(ctg_op, fom0, 0);
2284  buf.b_nob = imask->im_nr * sizeof(imask->im_range[0]);
2285  buf.b_addr = imask->im_range;
2286  ret = m0_ctg_mem_place(ctg_op, &buf, next);
2287  } else
2288  m0_fom_phase_set(fom0, next);
2289 
2290  return ret;
2291 }
2292 
2293 static int cas_ctidx_mem_free(struct cas_fom *fom, int next)
2294 {
2295  struct m0_dix_layout *layout;
2296  struct m0_dix_imask *imask;
2297  struct m0_fom *fom0 = &fom->cf_fom;
2298  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2300  bool free_called = false;
2301  struct m0_buf buf;
2302  int rc;
2303 
2304  rc = m0_ctg_op_rc(ctg_op);
2305  if (rc == 0) {
2306  m0_ctg_lookup_result(ctg_op, &buf);
2307  M0_ASSERT(buf.b_nob == sizeof(struct m0_dix_layout));
2308  layout = (struct m0_dix_layout *)buf.b_addr;
2309  imask = LAYOUT_IMASK_PTR(layout);
2310  if (!m0_dix_imask_is_empty(imask)) {
2311  m0_ctg_op_fini(ctg_op);
2312  m0_ctg_op_init(ctg_op, fom0, 0);
2313  ret = m0_ctg_mem_free(ctg_op, imask->im_range,
2315  free_called = true;
2316  }
2317  }
2318  if (rc != 0 || !free_called)
2319  m0_fom_phase_set(fom0, next);
2320  return ret;
2321 }
2322 
2323 static int cas_ctidx_delete(struct cas_fom *fom, const struct m0_cas_id *in_cid,
2324  int next)
2325 {
2326  struct m0_fom *fom0 = &fom->cf_fom;
2327  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2329  struct m0_buf kbuf;
2330 
2331  if (m0_ctg_op_rc(ctg_op) == 0) {
2332  m0_ctg_op_fini(ctg_op);
2333  m0_ctg_op_init(ctg_op, fom0, 0);
2334  /* The key is a component catalogue FID. */
2335  kbuf = M0_BUF_INIT_PTR_CONST(&in_cid->ci_fid);
2336  ret = m0_ctg_delete(ctg_op, m0_ctg_ctidx(), &kbuf, next);
2337  } else
2338  m0_fom_phase_set(fom0, next);
2339  return ret;
2340 }
2341 
2342 static int cas_ctidx_lookup(struct cas_fom *fom, const struct m0_cas_id *in_cid,
2343  int next)
2344 {
2345  struct m0_fom *fom0 = &fom->cf_fom;
2346  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2347  struct m0_buf kbuf;
2348 
2349  m0_ctg_op_init(ctg_op, fom0, 0);
2350  /* The key is a component catalogue FID. */
2351  kbuf = M0_BUF_INIT_PTR_CONST(&in_cid->ci_fid);
2352  return m0_ctg_lookup(ctg_op, m0_ctg_ctidx(), &kbuf, next);
2353 }
2354 
2355 static int cas_ctidx_insert(struct cas_fom *fom, const struct m0_cas_id *in_cid,
2356  int next)
2357 {
2358  struct m0_fom *fom0 = &fom->cf_fom;
2359  struct m0_cas_id cid_copy = *in_cid;
2360  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2361  bool ctg_op_ok = true;
2362  int ret = M0_FSO_AGAIN;
2363  struct m0_dix_imask *imask;
2364  struct m0_buf kbuf;
2365  struct m0_buf vbuf;
2366  struct m0_buf mbuf;
2367 
2368  imask = CID_IMASK_PTR(&cid_copy);
2369  if (!m0_dix_imask_is_empty(imask)) {
2370  if (m0_ctg_op_rc(ctg_op) == 0) {
2371  m0_ctg_mem_place_get(ctg_op, &mbuf);
2372  m0_ctg_op_fini(ctg_op);
2373  M0_ASSERT(imask->im_nr ==
2374  mbuf.b_nob / sizeof(imask->im_range[0]));
2375  /*
2376  * Substitude imask range array with its BE copy before
2377  * insertion into ctidx tree.
2378  */
2379  imask->im_range = (struct m0_ext *)mbuf.b_addr;
2380  } else
2381  ctg_op_ok = false;
2382  }
2383 
2384  if (ctg_op_ok == true) {
2385  m0_ctg_op_init(ctg_op, fom0, 0);
2386  /* The key is a component catalogue FID. */
2387  kbuf = M0_BUF_INIT_PTR(&cid_copy.ci_fid);
2388  vbuf = M0_BUF_INIT_PTR(&cid_copy.ci_layout);
2389  ret = m0_ctg_insert(ctg_op, m0_ctg_ctidx(), &kbuf, &vbuf, next);
2390  }
2391 
2392  return ret;
2393 }
2394 
2395 static m0_bcount_t cas_rpc_cutoff(const struct cas_fom *fom)
2396 {
2397  return cas_in_ut() ? m0_pagesize_get() :
2398  m0_fop_rpc_machine(fom->cf_fom.fo_fop)->rm_bulk_cutoff;
2399 }
2400 
2401 static int cas_prep_send(struct cas_fom *fom, enum m0_cas_opcode opc,
2402  enum m0_cas_type ct)
2403 {
2404  uint64_t rc = 0;
2405  struct m0_buf key;
2406  struct m0_buf val;
2407  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2408  m0_bcount_t rpc_cutoff = cas_rpc_cutoff(fom);
2409 
2410  switch (CTG_OP_COMBINE(opc, ct)) {
2412  m0_ctg_lookup_result(ctg_op, &val);
2413  rc = cas_place(&fom->cf_out_val, &val, rpc_cutoff);
2414  break;
2415  case CTG_OP_COMBINE(CO_GET, CT_META):
2416  case CTG_OP_COMBINE(CO_DEL, CT_META):
2419  case CTG_OP_COMBINE(CO_PUT, CT_META):
2420  /* Nothing to do: return code is all the user gets. */
2421  break;
2423  case CTG_OP_COMBINE(CO_CUR, CT_META):
2424  m0_ctg_cursor_kv_get(ctg_op, &key, &val);
2425  rc = cas_place(&fom->cf_out_key, &key, rpc_cutoff);
2426  if (ct == CT_BTREE && rc == 0)
2427  rc = cas_place(&fom->cf_out_val, &val,
2428  rpc_cutoff);
2429  break;
2430  }
2431 
2432  return rc;
2433 }
2434 
2435 static int cas_done(struct cas_fom *fom, struct m0_cas_op *op,
2436  struct m0_cas_rep *rep, enum m0_cas_opcode opc)
2437 {
2438  struct m0_cas_rec *rec_out;
2439  struct m0_cas_rec *rec;
2440  int ctg_rc = m0_ctg_op_rc(&fom->cf_ctg_op);
2441  int rc;
2442 
2443  M0_ASSERT(fom->cf_ipos < op->cg_rec.cr_nr);
2444  rec_out = cas_out_at(rep, fom->cf_opos);
2445  rec = cas_at(op, fom->cf_ipos);
2446 
2447  rc = rec_out->cr_rc;
2448  if (opc == CO_CUR) {
2449  fom->cf_curpos++;
2450  if (rc == 0 && ctg_rc == 0)
2451  rc = fom->cf_startkey_excluded ?
2452  fom->cf_curpos - 1 : fom->cf_curpos;
2453  if (ctg_rc == 0 &&
2454  ((fom->cf_curpos < rec->cr_rc) ||
2455  (fom->cf_startkey_excluded &&
2456  (fom->cf_curpos < rec->cr_rc + 1)))) {
2457  /* Continue with the same iteration. */
2458  --fom->cf_ipos;
2459  } else {
2460  /*
2461  * End the iteration on ctg cursor error because it
2462  * doesn't make sense to continue with broken iterator.
2463  */
2464  m0_ctg_cursor_put(&fom->cf_ctg_op);
2465  fom->cf_curpos = 0;
2466  fom->cf_startkey_excluded = false;
2467  }
2468  } else
2469  m0_ctg_op_fini(&fom->cf_ctg_op);
2470 
2471  ++fom->cf_ipos;
2472  ++fom->cf_opos;
2473  /*
2474  * Overwrite return code of put operation if key is already exist and
2475  * COF_CREATE is set or overwrite return code of del operation if key
2476  * is not found and COF_CROW is set.
2477  */
2478  if ((opc == CO_PUT && rc == -EEXIST && (op->cg_flags & COF_CREATE)) ||
2479  (opc == CO_DEL && rc == -ENOENT && (op->cg_flags & COF_CROW)))
2480  rc = 0;
2481 
2482  M0_LOG(M0_DEBUG, "pos: %" PRId64 " rc: %d", fom->cf_opos, rc);
2483  rec_out->cr_rc = rc;
2484 
2485  /*
2486  * Out buffers are passed to RPC AT layer. They will be deallocated
2487  * automatically as part of a reply FOP.
2488  */
2489  fom->cf_out_key = M0_BUF_INIT0;
2490  fom->cf_out_val = M0_BUF_INIT0;
2491 
2492  return rec_out->cr_rc;
2493 }
2494 
2495 static int cas_ctg_crow_fop_buf_prepare(const struct m0_cas_id *cid,
2496  struct m0_rpc_at_buf *at_buf)
2497 {
2498  struct m0_buf buf;
2499  int rc;
2500 
2501  M0_PRE(cid != NULL);
2502  M0_PRE(at_buf != NULL);
2503 
2504  m0_rpc_at_init(at_buf);
2506  (struct m0_cas_id *)cid),
2507  &buf.b_addr, &buf.b_nob);
2508  if (rc == 0) {
2509  at_buf->ab_type = M0_RPC_AT_INLINE;
2510  at_buf->u.ab_buf = buf;
2511  }
2512  return rc;
2513 }
2514 
2515 static int cas_ctg_crow_fop_create(const struct m0_cas_id *cid,
2516  struct m0_fop **out)
2517 {
2518  struct m0_cas_op *op;
2519  struct m0_cas_rec *rec;
2520  struct m0_fop *fop;
2521  int rc = 0;
2522 
2523  *out = NULL;
2524 
2525  M0_ALLOC_PTR(op);
2526  M0_ALLOC_PTR(rec);
2527  M0_ALLOC_PTR(fop);
2528  if (op == NULL || rec == NULL || fop == NULL)
2529  rc = -ENOMEM;
2530  if (rc == 0) {
2531  rc = cas_ctg_crow_fop_buf_prepare(cid, &rec->cr_key);
2532  if (rc == 0) {
2533  op->cg_id.ci_fid = m0_cas_meta_fid;
2534  op->cg_rec.cr_nr = 1;
2535  op->cg_rec.cr_rec = rec;
2537  *out = fop;
2538  }
2539  }
2540  if (rc != 0) {
2541  m0_free(op);
2542  m0_free(rec);
2543  m0_free(fop);
2544  }
2545  return rc;
2546 }
2547 
2549  struct m0_fom *serf)
2550 {
2551  struct cas_fom *leader = M0_AMB(leader, thrall, cf_thrall);
2552  struct m0_cas_rep *rep;
2553  int rc;
2554 
2555  rc = m0_fom_rc(serf);
2556  if (rc == 0) {
2557  M0_ASSERT(serf->fo_rep_fop != NULL);
2558  rep = (struct m0_cas_rep *)m0_fop_data(serf->fo_rep_fop);
2559  M0_ASSERT(rep != NULL);
2560  rc = rep->cgr_rc;
2561  if (rc == 0) {
2562  M0_ASSERT(rep->cgr_rep.cr_nr == 1);
2563  rc = rep->cgr_rep.cr_rec[0].cr_rc;
2564  }
2565  }
2566  leader->cf_thrall_rc = rc;
2567 }
2568 
2569 static void cas_addb2_fom_to_crow_fom(const struct m0_fom *fom0,
2570  const struct m0_fom *crow_fom0)
2571 {
2572  uint64_t fom0_sm_id = m0_sm_id_get(&fom0->fo_sm_phase);
2573  uint64_t crow_fom0_sm_id = m0_sm_id_get(&crow_fom0->fo_sm_phase);
2574 
2575  M0_ADDB2_ADD(M0_AVI_CAS_FOM_TO_CROW_FOM, fom0_sm_id, crow_fom0_sm_id);
2576 }
2577 
2578 M0_INTERNAL int m0_cas_fom_spawn(
2579  struct m0_fom *lead,
2580  struct m0_fom_thralldom *thrall,
2581  struct m0_fop *cas_fop,
2582  void (*on_fom_complete)(struct m0_fom_thralldom *,
2583  struct m0_fom *))
2584 {
2585  struct m0_fom *new_fom0;
2586  struct m0_reqh *reqh;
2587  struct m0_rpc_machine *mach;
2588  int rc;
2589 
2590  reqh = lead->fo_service->rs_reqh;
2591  mach = m0_reqh_rpc_mach_tlist_head(&reqh->rh_rpc_machines);
2592  m0_fop_rpc_machine_set(cas_fop, mach);
2593  cas_fop->f_item.ri_session = lead->fo_fop->f_item.ri_session;
2594  rc = cas_fom_create(cas_fop, &new_fom0, reqh);
2595  if (rc == 0) {
2596  new_fom0->fo_local = true;
2597  m0_fom_enthrall(lead,
2598  new_fom0,
2599  thrall,
2600  on_fom_complete);
2601  m0_fom_queue(new_fom0);
2602  cas_addb2_fom_to_crow_fom(lead, new_fom0);
2603  }
2604  /*
2605  * New FOM got reference to FOP, release ref counter as this
2606  * FOP is not needed here.
2607  */
2608  m0_fop_put_lock(cas_fop);
2609 
2610  return M0_RC(rc);
2611 }
2612 
2613 static int cas_ctg_crow_handle(struct cas_fom *fom, const struct m0_cas_id *cid)
2614 {
2615  struct m0_fop *fop;
2616  int rc;
2617 
2618  /*
2619  * Create fop to create component catalogue. For a new CAS FOM this FOP
2620  * will appear as arrived from the network. FOP will be deallocated by a
2621  * new CAS FOM.
2622  */
2623  rc = cas_ctg_crow_fop_create(cid, &fop) ?:
2624  m0_cas_fom_spawn(&fom->cf_fom,
2625  &fom->cf_thrall,
2627  return rc;
2628 }
2629 
2630 static bool cas_fom_invariant(const struct cas_fom *fom)
2631 {
2632  const struct m0_fom *fom0 = &fom->cf_fom;
2633  int phase = m0_fom_phase(fom0);
2634  struct m0_cas_op *op = cas_op(fom0);
2635  struct cas_service *service = M0_AMB(service,
2636  fom0->fo_service, c_service);
2637 
2638  return _0C(ergo(phase > M0_FOPH_INIT && phase != M0_FOPH_FAILURE,
2639  fom->cf_ipos <= op->cg_rec.cr_nr)) &&
2640  _0C(phase <= CAS_NR);
2641 }
2642 
2643 static void cas_fom_addb2_descr(struct m0_fom *fom)
2644 {
2645  struct m0_cas_op *op = cas_op(fom);
2646  struct m0_cas_rec *rec;
2647  int i;
2648 
2649  for (i = 0; i < op->cg_rec.cr_nr; i++) {
2650  rec = cas_at(op, i);
2651  M0_ADDB2_ADD(M0_AVI_CAS_KV_SIZES, FID_P(&op->cg_id.ci_fid),
2652  m0_rpc_at_len(&rec->cr_key),
2653  m0_rpc_at_len(&rec->cr_val));
2654  }
2655 }
2656 
2657 static const struct m0_fom_ops cas_fom_ops = {
2658  .fo_tick = &cas_fom_tick,
2659  .fo_home_locality = &cas_fom_home_locality,
2660  .fo_fini = &cas_fom_fini,
2661  .fo_addb2_descr = &cas_fom_addb2_descr
2662 };
2663 
2664 static const struct m0_fom_type_ops cas_fom_type_ops = {
2666 };
2667 
2668 static struct m0_sm_state_descr cas_fom_phases[] = {
2669  [CAS_CHECK_PRE] = {
2670  .sd_name = "cas-op-check-prepare",
2671  .sd_allowed = M0_BITS(CAS_CHECK, M0_FOPH_FAILURE)
2672  },
2673  [CAS_CHECK] = {
2674  .sd_name = "cas-op-check",
2675  .sd_allowed = M0_BITS(M0_FOPH_INIT, M0_FOPH_FAILURE)
2676  },
2677  [CAS_START] = {
2678  .sd_name = "start",
2679  .sd_allowed = M0_BITS(CAS_META_LOCK, CAS_LOAD_KEY)
2680  },
2681  [CAS_META_LOCK] = {
2682  .sd_name = "meta-lock",
2683  .sd_allowed = M0_BITS(CAS_META_LOOKUP)
2684  },
2685  [CAS_META_LOOKUP] = {
2686  .sd_name = "meta-lookup",
2688  },
2689  [CAS_META_LOOKUP_DONE] = {
2690  .sd_name = "meta-lookup-done",
2691  .sd_allowed = M0_BITS(CAS_CTG_CROW_DONE, CAS_LOAD_KEY,
2693  },
2694  [CAS_CTG_CROW_DONE] = {
2695  .sd_name = "ctg-crow-done",
2696  .sd_allowed = M0_BITS(CAS_START, M0_FOPH_FAILURE)
2697  },
2698  [CAS_LOCK] = {
2699  .sd_name = "lock",
2700  .sd_allowed = M0_BITS(CAS_CTIDX_LOCK, CAS_PREP)
2701  },
2702  [CAS_CTIDX_LOCK] = {
2703  .sd_name = "ctidx_lock",
2704  .sd_allowed = M0_BITS(CAS_PREP)
2705  },
2706  [CAS_LOAD_KEY] = {
2707  .sd_name = "load-key",
2708  .sd_allowed = M0_BITS(CAS_LOAD_VAL)
2709  },
2710  [CAS_LOAD_VAL] = {
2711  .sd_name = "load-value",
2712  .sd_allowed = M0_BITS(CAS_LOAD_DONE)
2713  },
2714  [CAS_LOAD_DONE] = {
2715  .sd_name = "load-done",
2716  .sd_allowed = M0_BITS(CAS_LOAD_KEY, CAS_LOCK,
2718  },
2719  [CAS_PREP] = {
2720  .sd_name = "prep",
2721  .sd_allowed = M0_BITS(M0_FOPH_TXN_OPEN, M0_FOPH_FAILURE)
2722  },
2723  [CAS_TXN_OPENED] = {
2724  .sd_name = "txn-opened",
2725  .sd_allowed = M0_BITS(CAS_META_UNLOCK, CAS_LOOP)
2726  },
2727  [CAS_META_UNLOCK] = {
2728  .sd_name = "meta-unlock",
2729  .sd_allowed = M0_BITS(CAS_LOOP)
2730  },
2731  [CAS_LOOP] = {
2732  .sd_name = "loop",
2733  .sd_allowed = M0_BITS(CAS_CTIDX, CAS_DTM0, CAS_INSERT_TO_DEAD,
2736  },
2737 
2738 
2739  [CAS_CTIDX] = {
2740  .sd_name = "ctidx",
2743  },
2744  [CAS_CTIDX_MEM_PLACE] = {
2745  .sd_name = "ctidx-im-range-alloc",
2746  .sd_allowed = M0_BITS(CAS_CTIDX_INSERT)
2747  },
2748  [CAS_CTIDX_INSERT] = {
2749  .sd_name = "ctidx-insert",
2750  .sd_allowed = M0_BITS(CAS_PREPARE_SEND, CAS_IDROP_LOOP)
2751  },
2752 
2753  [CAS_CTIDX_LOOKUP] = {
2754  .sd_name = "ctidx-lookup",
2755  .sd_allowed = M0_BITS(CAS_CTIDX_MEM_FREE)
2756  },
2757  [CAS_CTIDX_MEM_FREE] = {
2758  .sd_name = "ctidx-im-range-free",
2759  .sd_allowed = M0_BITS(CAS_PREPARE_SEND, CAS_CTIDX_DELETE)
2760  },
2761  [CAS_CTIDX_DELETE] = {
2762  .sd_name = "ctidx-delete",
2763  .sd_allowed = M0_BITS(CAS_PREPARE_SEND, CAS_IDROP_LOOP)
2764  },
2765 
2766 
2767 
2768  [CAS_DONE] = {
2769  .sd_name = "done",
2770  .sd_allowed = M0_BITS(CAS_LOOP, CAS_IDROP_LOCK_LOOP)
2771  },
2772  [CAS_PREPARE_SEND] = {
2773  .sd_name = "prep-send",
2774  .sd_allowed = M0_BITS(CAS_SEND_KEY, CAS_DONE, CAS_LOOP)
2775  },
2776  [CAS_SEND_KEY] = {
2777  .sd_name = "send-key",
2778  .sd_allowed = M0_BITS(CAS_KEY_SENT, CAS_SEND_VAL)
2779  },
2780  [CAS_KEY_SENT] = {
2781  .sd_name = "key-sent",
2782  .sd_allowed = M0_BITS(CAS_SEND_VAL)
2783  },
2784  [CAS_SEND_VAL] = {
2785  .sd_name = "send-val",
2786  .sd_allowed = M0_BITS(CAS_VAL_SENT, CAS_DONE)
2787  },
2788  [CAS_VAL_SENT] = {
2789  .sd_name = "val-sent",
2790  .sd_allowed = M0_BITS(CAS_DONE)
2791  },
2792  [CAS_DEAD_INDEX_LOCK] = {
2793  .sd_name = "dead-index-lock",
2794  .sd_allowed = M0_BITS(CAS_LOCK)
2795  },
2796  [CAS_INSERT_TO_DEAD] = {
2797  .sd_name = "insert-dead-index",
2799  },
2800  [CAS_DELETE_FROM_META] = {
2801  .sd_name = "detele-from-meta",
2802  .sd_allowed = M0_BITS(CAS_CTIDX, CAS_IDROP_LOOP,
2804  },
2805  [CAS_IDROP_LOOP] = {
2806  .sd_name = "index-drop-loop",
2807  .sd_allowed = M0_BITS(CAS_LOOP, CAS_IDROP_LOCK_LOOP,
2809  },
2810  [CAS_IDROP_LOCK_LOOP] = {
2811  .sd_name = "index-drop-lock-loop",
2813  CAS_LOOP)
2814  },
2815  [CAS_IDROP_LOCKED] = {
2816  .sd_name = "index-drop-locked",
2817  .sd_allowed = M0_BITS(CAS_PREPARE_SEND)
2818  },
2819  [CAS_IDROP_START_GC] = {
2820  .sd_name = "index-drop-start-gc",
2821  .sd_allowed = M0_BITS(M0_FOPH_SUCCESS)
2822  },
2823  [CAS_DTM0] = {
2824  .sd_name = "dtm0",
2825  .sd_allowed = M0_BITS(M0_FOPH_SUCCESS, M0_FOPH_FAILURE)
2826  },
2827 };
2828 
2831  { "cas-op-check-prepare", M0_FOPH_INIT, CAS_CHECK_PRE },
2832  { "cas-op-check", CAS_CHECK_PRE, CAS_CHECK },
2833  { "cas-op-check_pre_failed", CAS_CHECK_PRE, M0_FOPH_FAILURE },
2834  { "cas-op-checked", CAS_CHECK, M0_FOPH_INIT },
2835  { "cas-op-check-failed", CAS_CHECK, M0_FOPH_FAILURE },
2836  { "tx-initialised", M0_FOPH_TXN_OPEN, CAS_START },
2837  { "ctg-op?", CAS_START, CAS_META_LOCK },
2838  { "meta-op?", CAS_START, CAS_LOAD_KEY },
2839  { "meta-locked", CAS_META_LOCK, CAS_META_LOOKUP },
2840  { "meta-lookup-launched", CAS_META_LOOKUP, CAS_META_LOOKUP_DONE },
2841  { "key-alloc-failure", CAS_META_LOOKUP, M0_FOPH_FAILURE },
2842  { "meta-lookup-done", CAS_META_LOOKUP_DONE, CAS_LOAD_KEY },
2843  { "meta-lookup-fail", CAS_META_LOOKUP_DONE, M0_FOPH_FAILURE },
2844  { "ctg-crow-done", CAS_META_LOOKUP_DONE, CAS_CTG_CROW_DONE },
2845  { "ctg-crow-success", CAS_CTG_CROW_DONE, CAS_START },
2846  { "ctg-crow-fail", CAS_CTG_CROW_DONE, M0_FOPH_FAILURE },
2847  { "index-locked", CAS_LOCK, CAS_PREP },
2848  { "key-loaded", CAS_LOAD_KEY, CAS_LOAD_VAL },
2849  { "val-loaded", CAS_LOAD_VAL, CAS_LOAD_DONE },
2850  { "more-kv-to-load", CAS_LOAD_DONE, CAS_LOAD_KEY },
2851  { "meta-locked", CAS_LOCK, CAS_CTIDX_LOCK },
2852  { "ctidx-locked", CAS_CTIDX_LOCK, CAS_PREP },
2853  { "load-finished", CAS_LOAD_DONE, CAS_LOCK },
2854  { "load-finished-idrop", CAS_LOAD_DONE, CAS_DEAD_INDEX_LOCK },
2855  { "kv-setup-failure", CAS_LOAD_DONE, M0_FOPH_FAILURE },
2856  { "index-locked", CAS_LOCK, CAS_PREP },
2857  { "meta-locked", CAS_LOCK, CAS_CTIDX_LOCK },
2858  { "tx-credit-calculated", CAS_PREP, M0_FOPH_TXN_OPEN },
2859  { "keys-vals-invalid", CAS_PREP, M0_FOPH_FAILURE },
2860  { "txn-opened-ctg-op?", CAS_TXN_OPENED, CAS_META_UNLOCK },
2861  { "txn-opened-meta-op?", CAS_TXN_OPENED, CAS_LOOP },
2862  { "meta-unlocked", CAS_META_UNLOCK, CAS_LOOP },
2863  { "all-done?", CAS_LOOP, M0_FOPH_SUCCESS },
2864  { "reply-too_large", CAS_LOOP, M0_FOPH_FAILURE },
2865  { "do-ctidx-op", CAS_LOOP, CAS_CTIDX },
2866  { "op-launched", CAS_LOOP, CAS_PREPARE_SEND },
2867  { "do-dtm0-op", CAS_LOOP, CAS_DTM0 },
2868  { "ready-to-send", CAS_PREPARE_SEND, CAS_SEND_KEY },
2869  { "next-key", CAS_PREPARE_SEND, CAS_LOOP },
2870  { "prep-error", CAS_PREPARE_SEND, CAS_DONE },
2871  { "key-sent", CAS_SEND_KEY, CAS_KEY_SENT },
2872  { "skip-key-sending", CAS_SEND_KEY, CAS_SEND_VAL },
2873  { "send-val", CAS_KEY_SENT, CAS_SEND_VAL },
2874  { "val-sent", CAS_SEND_VAL, CAS_VAL_SENT },
2875  { "skip-val-sending", CAS_SEND_VAL, CAS_DONE },
2876  { "processing-done", CAS_VAL_SENT, CAS_DONE },
2877  { "goto-next-rec", CAS_DONE, CAS_LOOP },
2878  { "ctidx-insert", CAS_CTIDX, CAS_CTIDX_MEM_PLACE },
2879  { "ctidx-delete", CAS_CTIDX, CAS_CTIDX_LOOKUP },
2880  { "op-failed", CAS_CTIDX, CAS_PREPARE_SEND },
2881 
2882  { "ctidx-mem-place", CAS_CTIDX_MEM_PLACE, CAS_CTIDX_INSERT },
2883  { "ctidx-do-insert", CAS_CTIDX_INSERT, CAS_PREPARE_SEND },
2884  { "ctidx-ins-idx-drop", CAS_CTIDX_INSERT, CAS_IDROP_LOOP },
2885 
2886  { "ctidx-lookup", CAS_CTIDX_LOOKUP, CAS_CTIDX_MEM_FREE },
2887  { "ctidx-mem-free", CAS_CTIDX_MEM_FREE, CAS_CTIDX_DELETE },
2888  { "ctidx-lookup-failed", CAS_CTIDX_MEM_FREE, CAS_PREPARE_SEND },
2889  { "ctidx-do-delete", CAS_CTIDX_DELETE, CAS_PREPARE_SEND },
2890  { "ctidx-del-idx-drop", CAS_CTIDX_DELETE, CAS_IDROP_LOOP },
2891  { "key-add-reply", CAS_DONE, CAS_LOOP },
2892  { "op-launched", CAS_LOOP, CAS_INSERT_TO_DEAD },
2893  { "idx-drop-reply-sent", CAS_DONE, CAS_IDROP_LOCK_LOOP },
2894  { "dead-index-locked", CAS_DEAD_INDEX_LOCK, CAS_LOCK },
2895  { "dead-index-inserted", CAS_INSERT_TO_DEAD, CAS_DELETE_FROM_META },
2896  { "meta-lookup-fail", CAS_INSERT_TO_DEAD, CAS_PREPARE_SEND },
2897  { "meta-deleted", CAS_DELETE_FROM_META, CAS_IDROP_LOOP },
2898  { "meta-deleted-ctidx", CAS_DELETE_FROM_META, CAS_CTIDX },
2899  { "dead-index-ins-fail", CAS_DELETE_FROM_META, CAS_PREPARE_SEND },
2900  { "idx-drop-start-lock", CAS_IDROP_LOOP, CAS_IDROP_LOCK_LOOP },
2901  { "idx-drop-next", CAS_IDROP_LOOP, CAS_LOOP },
2902  { "idx-lock-failed", CAS_IDROP_LOOP, CAS_PREPARE_SEND },
2903  { "idx-drop-locking", CAS_IDROP_LOCK_LOOP, CAS_IDROP_LOCKED },
2904  { "idx-drop-locked", CAS_IDROP_LOCK_LOOP, CAS_IDROP_START_GC },
2905  { "idx-drop-skip-lock", CAS_IDROP_LOCK_LOOP, CAS_LOOP },
2906  { "idx-dropped-ok", CAS_IDROP_LOCKED, CAS_PREPARE_SEND },
2907  { "idx-drop-all-done", CAS_IDROP_START_GC, M0_FOPH_SUCCESS },
2908 
2909  { "dtm0-op-done", CAS_DTM0, M0_FOPH_SUCCESS },
2910  { "dtm0-op-fail", CAS_DTM0, M0_FOPH_FAILURE },
2911 
2912  { "ut-short-cut", M0_FOPH_QUEUE_REPLY, M0_FOPH_TXN_LOGGED_WAIT }
2913 };
2914 
2915 static struct m0_sm_conf cas_sm_conf = {
2916  .scf_name = "cas-fom",
2917  .scf_nr_states = ARRAY_SIZE(cas_fom_phases),
2918  .scf_state = cas_fom_phases,
2919  .scf_trans_nr = ARRAY_SIZE(cas_fom_trans),
2920  .scf_trans = cas_fom_trans
2921 };
2922 
2923 static const struct m0_reqh_service_type_ops cas_service_type_ops = {
2925 };
2926 
2927 static const struct m0_reqh_service_ops cas_service_ops = {
2929  .rso_start = &cas_service_start,
2930  .rso_prepare_to_stop = &cas_service_prepare_to_stop,
2931  .rso_stop = &cas_service_stop,
2932  .rso_fini = &cas_service_fini
2933 };
2934 
2936  .rst_name = "M0_CST_CAS",
2937  .rst_ops = &cas_service_type_ops,
2938  .rst_level = M0_RS_LEVEL_NORMAL,
2939  .rst_typecode = M0_CST_CAS
2940 };
2941 
2942 #undef M0_TRACE_SUBSYSTEM
2943 
2946 /*
2947  * Local variables:
2948  * c-indentation-style: "K&R"
2949  * c-basic-offset: 8
2950  * tab-width: 8
2951  * fill-column: 80
2952  * scroll-step: 1
2953  * End:
2954  */
2955 /*
2956  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
2957  */
static int cas_key_send(struct cas_fom *fom, const struct m0_cas_op *op, enum m0_cas_opcode opc, const struct m0_cas_rep *rep, enum cas_fom_phase next_phase)
Definition: service.c:858
uint64_t cr_rc
Definition: cas.h:340
Definition: cas.h:356
M0_INTERNAL int m0_reqh_service_state_get(const struct m0_reqh_service *s)
Definition: reqh_service.c:560
struct m0_poolmach_state * pm_state
Definition: pool_machine.h:169
static uint64_t cas_in_nr(const struct m0_fop *fop)
Definition: service.c:2011
M0_INTERNAL void m0_long_lock_link_init(struct m0_long_lock_link *link, struct m0_fom *fom, struct m0_long_lock_addb2 *addb2)
Definition: fom_long_lock.c:66
cas_fom_phase
Definition: service.c:376
static void cas_ctg_crow_done_cb(struct m0_fom_thralldom *thrall, struct m0_fom *serf)
Definition: service.c:2548
uint32_t rit_opcode
Definition: item.h:474
static size_t nr
Definition: dump.c:1505
M0_INTERNAL void m0_ctg_ctidx_insert_credits(struct m0_cas_id *cid, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1962
M0_INTERNAL void m0_ctg_delete_credit(struct m0_cas_ctg *ctg, m0_bcount_t knob, m0_bcount_t vnob, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1913
struct m0_rpc_at_buf cr_val
Definition: cas.h:301
Definition: cas.h:353
struct m0_dtm0_tx_desc cg_txd
Definition: cas.h:400
#define M0_PRE(cond)
M0_INTERNAL void m0_sm_conf_init(struct m0_sm_conf *conf)
Definition: sm.c:340
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static int cas_op_check(struct m0_cas_op *op, struct cas_fom *fom, bool is_index_drop)
Definition: service.c:1861
M0_INTERNAL int m0_ctg_meta_insert(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1336
Definition: cas.h:355
#define M0_BUF_INIT_PTR_CONST(p)
Definition: buf.h:73
uint64_t cf_curpos
Definition: service.c:337
static int cas_place(struct m0_buf *dst, struct m0_buf *src, m0_bcount_t cutoff)
Definition: service.c:2048
stats_kv_io
Definition: service.c:298
M0_INTERNAL int m0_ctg_dead_index_insert(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, int next_phase)
Definition: ctg_store.c:1405
struct m0_rpc_at_buf ck_key
Definition: cas.h:127
struct m0_fop * fo_fop
Definition: fom.h:490
int const char const void size_t int flags
Definition: dir.c:328
struct m0_dtm0_tx_desc cg_txd
Definition: cas.h:307
static int cas_prep_send(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct)
Definition: service.c:2401
#define M0_FOM_LONG_LOCK_RETURN(rc)
#define NULL
Definition: misc.h:38
uint32_t pst_nr_devices
Definition: pool_machine.h:108
struct m0_long_lock_addb2 cf_dead_index_addb2
Definition: service.c:365
M0_INTERNAL struct m0_dtm0_service * m0_dtm0_service_find(const struct m0_reqh *reqh)
Definition: service.c:405
static struct m0_bufvec dst
Definition: xform.c:61
M0_INTERNAL int m0_ctg_delete(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1540
struct m0_long_lock_addb2 cf_del_lock_addb2
Definition: service.c:366
M0_INTERNAL int m0_ctg_lookup_delete(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, struct m0_buf *val, int flags, int next_phase)
Definition: ctg_store.c:1465
struct m0_reqh_service c_service
Definition: service.c:312
Definition: idx_mock.c:52
#define ergo(a, b)
Definition: misc.h:293
uint64_t cf_kv_stats[STATS_KV_NR][STATS_KV_IO_NR][STATS_NR]
Definition: service.c:373
Definition: cas.h:350
m0_be_tx_state
Definition: tx.h:214
Definition: sm.h:350
Definition: cas.h:351
void * b_addr
Definition: buf.h:39
M0_INTERNAL struct m0_pool_version * m0_pool_version_find(struct m0_pools_common *pc, const struct m0_fid *id)
Definition: pool.c:586
M0_INTERNAL void m0_ctg_lookup_result(struct m0_ctg_op *ctg_op, struct m0_buf *buf)
Definition: ctg_store.c:1570
int(* fo_tick)(struct m0_fom *fom)
Definition: fom.h:663
M0_INTERNAL bool m0_buf_eq(const struct m0_buf *x, const struct m0_buf *y)
Definition: buf.c:90
M0_INTERNAL void m0_cas_svc_init(void)
Definition: service.c:515
M0_INTERNAL int m0_ctg_gc_wait(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1349
M0_INTERNAL struct m0_long_lock * m0_ctg_lock(struct m0_cas_ctg *ctg)
Definition: ctg_store.c:2162
M0_INTERNAL void m0_ctg_store_fini(void)
Definition: ctg_store.c:870
M0_INTERNAL void m0_ctg_mem_place_get(struct m0_ctg_op *ctg_op, struct m0_buf *buf)
Definition: ctg_store.c:2105
uint64_t cv_nr
Definition: cas.h:135
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:78
M0_INTERNAL int m0_ctg_meta_cursor_next(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1694
#define M0_LOG(level,...)
Definition: trace.h:167
M0_INTERNAL void m0_rpc_at_init(struct m0_rpc_at_buf *ab)
Definition: at.c:433
static void cas_fom_success(struct cas_fom *fom, enum m0_cas_opcode opc)
Definition: service.c:1075
Definition: cas.h:364
M0_INTERNAL bool m0_rpc_at_is_set(const struct m0_rpc_at_buf *ab)
Definition: at.c:492
static void cas_fom_cleanup(struct cas_fom *fom, bool ctg_op_fini)
Definition: service.c:976
static int cas_dtm0_logrec_add(struct m0_fom *fom0, enum m0_dtm0_tx_pa_state state)
Definition: service.c:1048
static void cas_service_fini(struct m0_reqh_service *service)
Definition: service.c:595
struct m0_cas_id * cf_in_cids
Definition: service.c:352
Definition: cas.h:247
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
struct m0_long_lock_link cf_del_lock
Definition: service.c:335
struct cas_kv * cf_ikv
Definition: service.c:344
struct m0_buf ckv_key
Definition: service.c:317
M0_INTERNAL void m0_cas_id_fini(struct m0_cas_id *cid)
Definition: cas.c:199
struct m0_long_lock_link cf_dead_index
Definition: service.c:330
uint64_t sd_allowed
Definition: sm.h:422
M0_INTERNAL int m0_ctg_lookup(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1555
static bool cas_fom_invariant(const struct cas_fom *fom)
Definition: service.c:2630
M0_INTERNAL const struct m0_fid m0_cas_meta_fid
Definition: cas.c:147
M0_INTERNAL void m0_cas__ut_svc_be_set(struct m0_reqh_service *svc, struct m0_be_domain *dom)
Definition: service.c:545
struct m0_dix_layout ci_layout
Definition: cas.h:120
static bool cas_max_reply_payload_exceeded(struct cas_fom *fom)
Definition: service.c:922
M0_INTERNAL const struct m0_fid_type m0_cas_index_fid_type
Definition: cas.c:159
static enum m0_cas_type cas_type(const struct m0_fom *fom)
Definition: service.c:2003
static void cas_update_kv_stats(struct cas_fom *fom, const struct m0_rpc_at_buf *ab, m0_bcount_t nob, enum stats_kv kv, enum stats_kv_io kv_io)
Definition: service.c:751
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:219
static struct m0_cas_rec * cas_at(struct m0_cas_op *op, int idx)
Definition: service.c:2129
M0_INTERNAL void m0_sm_conf_trans_extend(const struct m0_sm_conf *base, struct m0_sm_conf *sub)
Definition: sm.c:726
uint64_t cf_opos
Definition: service.c:324
M0_INTERNAL void m0_sm_conf_extend(const struct m0_sm_state_descr *base, struct m0_sm_state_descr *sub, uint32_t nr)
Definition: sm.c:763
struct m0_cas_recv cgr_rep
Definition: cas.h:431
struct m0_dtx fo_tx
Definition: fom.h:498
static struct m0_addb2_mach * mach
Definition: storage.c:42
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
#define M0_BITS(...)
Definition: misc.h:236
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL int m0_pageshift_get(void)
Definition: memory.c:238
static void cas_addb2_fom_to_crow_fom(const struct m0_fom *fom0, const struct m0_fom *crow_fom0)
Definition: service.c:2569
M0_INTERNAL bool m0_dtm0_tx_desc__invariant(const struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:49
static int void * buf
Definition: dir.c:1019
M0_INTERNAL int m0_cas_fom_spawn(struct m0_fom *lead, struct m0_fom_thralldom *thrall, struct m0_fop *cas_fop, void(*on_fom_complete)(struct m0_fom_thralldom *, struct m0_fom *))
Definition: service.c:2578
M0_INTERNAL struct m0_cas_ctg * m0_ctg_dead_index(void)
Definition: ctg_store.c:2140
M0_INTERNAL int m0_rpc_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom, int next_phase)
Definition: at.c:414
M0_INTERNAL int m0_ctg_cursor_get(struct m0_ctg_op *ctg_op, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1660
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
struct m0_fom_thralldom cf_thrall
Definition: service.c:359
static void cas_at_fini(struct m0_rpc_at_buf *ab)
Definition: service.c:732
static void cas_fom_addb2_descr(struct m0_fom *fom)
Definition: service.c:2643
#define LAYOUT_IMASK_PTR(l)
Definition: service.c:424
const struct m0_sm_conf m0_generic_conf
Definition: fom_generic.c:838
struct m0_fop_getxattr_rep * rep
Definition: dir.c:455
M0_INTERNAL void m0_ctg_insert_credit(struct m0_cas_ctg *ctg, m0_bcount_t knob, m0_bcount_t vnob, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1905
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:473
m0_fom_phase
Definition: fom.h:372
static struct m0_be_tx * m0_fom_tx(struct m0_fom *fom)
Definition: fom.h:537
Definition: sock.c:887
static bool cas_is_ro(enum m0_cas_opcode opc)
Definition: service.c:1998
static const struct m0_fid * cas_fid(const struct m0_fom *fom)
Definition: service.c:1757
M0_INTERNAL int m0_dtm0_on_committed(struct m0_fom *fom, const struct m0_dtm0_tid *id)
Definition: fop.c:302
static struct m0_pools_common pc
Definition: iter_ut.c:59
M0_INTERNAL int m0_rpc_at_reply(struct m0_rpc_at_buf *in, struct m0_rpc_at_buf *out, struct m0_buf *repbuf, struct m0_fom *fom, int next_phase)
Definition: at.c:528
struct m0_fom_type ft_fom_type
Definition: fop.h:233
struct m0_rpc_at_buf cr_val
Definition: cas.h:182
struct m0_pooldev * pst_devices_array
Definition: pool_machine.h:111
struct m0_fid fid
Definition: di.c:46
M0_INTERNAL bool m0_ctg_cursor_is_initialised(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1633
return M0_RC(rc)
struct m0_be_dtm0_log * dos_log
Definition: service.h:50
op
Definition: libdemo.c:64
M0_INTERNAL void m0_cas_gc_init(void)
Definition: index_gc.c:426
struct m0_dtm0_tid dtd_id
Definition: tx_desc.h:121
M0_INTERNAL int m0_ctg_mem_free(struct m0_ctg_op *ctg_op, void *area, int next_phase)
Definition: ctg_store.c:2117
#define M0_ENTRY(...)
Definition: trace.h:170
M0_INTERNAL int m0_pagesize_get(void)
Definition: memory.c:233
Definition: buf.h:37
struct m0_cas_rec * cr_rec
Definition: cas.h:236
static int cas_buf_cid_decode(struct m0_buf *enc_buf, struct m0_cas_id *cid)
Definition: service.c:2029
M0_INTERNAL int m0_ctg_insert(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, const struct m0_buf *val, int next_phase)
Definition: ctg_store.c:1448
struct m0_rpc_at_buf ck_val
Definition: cas.h:128
M0_INTERNAL int m0_ctg_cursor_next(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1680
struct m0_rpc_at_buf cr_key
Definition: cas.h:172
M0_INTERNAL int m0_ctg_meta_lookup(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1362
int opcode
Definition: crate.c:301
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
M0_INTERNAL bool m0_long_write_lock(struct m0_long_lock *lk, struct m0_long_lock_link *link, int next_phase)
M0_INTERNAL void m0_ctg_cursor_fini(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1749
int i
Definition: dir.c:1033
void m0_fop_rpc_machine_set(struct m0_fop *fop, struct m0_rpc_machine *mach)
Definition: fop.c:351
Definition: cas.h:359
struct m0_fop_type * f_type
Definition: fop.h:82
#define PRIu64
Definition: types.h:58
uint64_t cr_nr
Definition: cas.h:235
struct m0_fom cf_fom
Definition: service.c:322
M0_INTERNAL bool m0_long_lock(struct m0_long_lock *lock, bool write, struct m0_long_lock_link *link, int next_phase)
static m0_bcount_t cas_kv_nob(const struct m0_buf *inbuf)
Definition: service.c:2074
struct m0_rpc_machine * m0_fop_rpc_machine(const struct m0_fop *fop)
Definition: fop.c:359
static const struct m0_reqh_service_type_ops cas_service_type_ops
Definition: service.c:509
static int cas_service_type_allocate(struct m0_reqh_service **service, const struct m0_reqh_service_type *st)
Definition: service.c:604
return M0_ERR(-EOPNOTSUPP)
m0_bcount_t rm_bulk_cutoff
Definition: rpc_machine.h:157
struct m0_long_lock_link cf_ctidx
Definition: service.c:329
M0_INTERNAL bool m0_dix_layout_eq(const struct m0_dix_layout *layout1, const struct m0_dix_layout *layout2)
Definition: layout.c:322
static int cas_ctidx_mem_free(struct cas_fom *fom, int next)
Definition: service.c:2293
static int key
Definition: locality.c:283
Definition: cas.h:348
M0_INTERNAL void m0_ctg_cursor_put(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1744
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
struct m0_fom_thralldom thrall
Definition: ms_fom_ut.c:110
M0_INTERNAL int m0_ctg_mem_place(struct m0_ctg_op *ctg_op, const struct m0_buf *buf, int next_phase)
Definition: ctg_store.c:2090
static const struct socktype stype[]
Definition: sock.c:1156
if(value==NULL)
Definition: dir.c:350
Definition: cas.h:264
#define CTG_OP_COMBINE(opc, ct)
Definition: ctg_store.h:241
M0_INTERNAL struct m0_cas_ctg * m0_ctg_meta(void)
Definition: ctg_store.c:2130
#define ENABLE_DTM0
Definition: config.h:36
int m0_fom_tick_generic(struct m0_fom *fom)
Definition: fom_generic.c:848
static int cas_ctg_crow_fop_create(const struct m0_cas_id *cid, struct m0_fop **out)
Definition: service.c:2515
M0_INTERNAL void m0_long_unlock(struct m0_long_lock *lock, struct m0_long_lock_link *link)
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
m0_bcount_t b_nob
Definition: buf.h:38
#define M0_ASSERT(cond)
M0_INTERNAL void m0_cas_svc_fini(void)
Definition: service.c:529
struct m0_buf ckv_val
Definition: service.c:318
struct m0_long_lock_addb2 cf_ctidx_addb2
Definition: service.c:364
const char * scf_name
Definition: sm.h:352
struct m0_buf cf_out_val
Definition: service.c:369
struct m0_xcode_type * m0_cas_id_xc
Definition: cas_xc.c:11
uint64_t cf_in_cids_nr
Definition: service.c:354
struct m0_cas_ctg * cf_ctg
Definition: service.c:326
struct m0_fid pver
Definition: idx_dix.c:74
bool fo_local
Definition: fom.h:503
const char * rst_name
Definition: reqh_service.h:448
struct m0_be_domain * c_be_domain
Definition: service.c:313
m0_pool_nd_state
Definition: pool_machine.h:57
void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
Definition: fom.c:1699
static int op_sync_wait(struct m0_fom *fom)
Definition: service.c:1138
M0_INTERNAL void m0_long_lock_link_fini(struct m0_long_lock_link *link)
Definition: fom_long_lock.c:76
M0_INTERNAL bool cas_in_ut(void)
Definition: cas.c:221
struct m0_fid rs_service_fid
Definition: reqh_service.h:221
uint32_t scf_nr_states
Definition: sm.h:354
struct m0_crv cr_ver
Definition: cas.h:228
M0_INTERNAL void m0_be_tx_credit_add(struct m0_be_tx_credit *c0, const struct m0_be_tx_credit *c1)
Definition: tx_credit.c:44
M0_INTERNAL bool m0_rpc_item_max_payload_exceeded(struct m0_rpc_item *item, struct m0_rpc_session *session)
Definition: item.c:490
static int cas_ctidx_delete(struct cas_fom *fom, const struct m0_cas_id *in_cid, int next)
Definition: service.c:2323
static struct m0_rpc_at_buf * cas_out_complementary(enum m0_cas_opcode opc, const struct m0_cas_op *op, bool key, size_t opos)
Definition: service.c:805
M0_INTERNAL int m0_ctg_store_init(struct m0_be_domain *dom)
Definition: ctg_store.c:814
union m0_rpc_at_buf::@448 u
uint32_t cg_flags
Definition: cas.h:395
struct m0_tl rh_rpc_machines
Definition: reqh.h:135
M0_BASSERT(M0_CAS_GET_FOP_OPCODE==CO_GET+M0_CAS_GET_FOP_OPCODE)
M0_INTERNAL void m0_fom_enthrall(struct m0_fom *leader, struct m0_fom *serf, struct m0_fom_thralldom *thrall, void(*end)(struct m0_fom_thralldom *thrall, struct m0_fom *serf))
static struct m0_stob_domain * dom
Definition: storage.c:38
M0_INTERNAL void m0_ctg_mark_deleted_credit(struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1805
static int next[]
Definition: cp.c:248
#define M0_BUF_INIT0
Definition: buf.h:71
static int cas_ctidx_insert(struct cas_fom *fom, const struct m0_cas_id *in_cid, int next)
Definition: service.c:2355
struct m0_xcode_type * m0_cas_op_xc
Definition: cas_xc.c:17
M0_INTERNAL int m0_xcode_obj_enc_to_buf(struct m0_xcode_obj *obj, void **buf, m0_bcount_t *len)
Definition: xcode.c:832
M0_INTERNAL void m0_fom_mod_rep_fill(struct m0_fop_mod_rep *rep, struct m0_fom *fom)
Definition: fom_generic.c:68
M0_INTERNAL int m0_rpc_at_get(const struct m0_rpc_at_buf *ab, struct m0_buf *buf)
Definition: at.c:399
Definition: cas.h:349
uint64_t cf_ikv_nr
Definition: service.c:346
static struct m0_sm_conf cas_sm_conf
Definition: service.c:512
struct m0_be_tx_credit tx_betx_cred
Definition: dtm.h:560
M0_INTERNAL const struct m0_fid_type m0_cctg_fid_type
Definition: cas.c:164
int co_rc
Definition: ctg_store.h:227
struct m0_cas_kv * cv_rec
Definition: cas.h:136
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
M0_INTERNAL const struct m0_fid_type * m0_fid_type_getfid(const struct m0_fid *fid)
Definition: fid.c:76
struct m0_ctg_op cf_ctg_op
Definition: service.c:325
M0_INTERNAL int m0_rpc_at_reply_rc(struct m0_rpc_at_buf *out)
Definition: at.c:583
static bool cas_service_started(struct m0_fop *fop, struct m0_reqh *reqh)
Definition: service.c:619
M0_INTERNAL void m0_cas_gc_fini(void)
Definition: index_gc.c:455
M0_INTERNAL struct m0_be_domain * m0_cas__ut_svc_be_get(struct m0_reqh_service *svc)
Definition: service.c:553
static int cas_at_reply(struct m0_rpc_at_buf *in, struct m0_rpc_at_buf *out, struct m0_buf *repbuf, struct m0_fom *fom, int next_phase)
Definition: service.c:716
uint32_t pd_sdev_idx
Definition: pool.h:437
#define M0_POST(cond)
M0_INTERNAL void m0_ctg_create_credit(struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1823
static const struct m0_fom_type_ops cas_fom_type_ops
Definition: service.c:511
Definition: reqh.h:94
uint32_t dl_type
Definition: layout.h:100
struct m0_cas_recv cg_rec
Definition: cas.h:295
M0_INTERNAL void m0_ctg_meta_cursor_init(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1722
Definition: dump.c:103
struct m0_fid ci_fid
Definition: cas.h:113
M0_INTERNAL void m0_cas_gc_wait_sync(void)
Definition: index_gc.c:591
static struct m0_cas_rec * cas_out_at(const struct m0_cas_rep *rep, int idx)
Definition: service.c:2135
M0_INTERNAL uint32_t m0_dix_fid_cctg_device_id(const struct m0_fid *cctg_fid)
Definition: fid_convert.c:81
static const struct m0_reqh_service_ops cas_service_ops
Definition: service.c:508
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
M0_INTERNAL void m0_ctg_cursor_init(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg)
Definition: ctg_store.c:1640
struct m0_fid p_fid
Definition: tx_desc.h:110
M0_INTERNAL void m0_ctg_op_get_ver(struct m0_ctg_op *ctg_op, struct m0_crv *out)
Definition: ctg_store.c:1582
static int cas_incoming_kv_setup(struct cas_fom *fom, const struct m0_cas_op *op)
Definition: service.c:766
static int cas_id_check(const struct m0_cas_id *cid)
Definition: service.c:1843
struct m0_cas_kv_vec cr_kv_bufs
Definition: cas.h:195
union m0_dix_layout::@145 u
struct m0_sm t_sm
Definition: tx.h:281
M0_INTERNAL int m0_buf_copy(struct m0_buf *dest, const struct m0_buf *src)
Definition: buf.c:104
static int cas_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: service.c:634
#define FID_P(f)
Definition: fid.h:77
#define PRId64
Definition: types.h:57
m0_cas_opcode
Definition: cas.h:347
void imask(void)
Definition: client_ut.c:141
Definition: cas.h:376
M0_INTERNAL bool m0_dix_imask_is_empty(const struct m0_dix_imask *mask)
Definition: imask.c:130
M0_INTERNAL void m0_ctg_op_init(struct m0_ctg_op *ctg_op, struct m0_fom *fom, uint32_t flags)
Definition: ctg_store.c:1759
M0_INTERNAL int m0_xcode_data_size(struct m0_xcode_ctx *ctx, const struct m0_xcode_obj *obj)
Definition: xcode.c:437
M0_INTERNAL void m0_cas_svc_fop_args(struct m0_sm_conf **sm_conf, const struct m0_fom_type_ops **fom_ops, struct m0_reqh_service_type **svctype)
Definition: service.c:536
int m0_reqh_service_async_start_simple(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.c:601
struct m0_fop * m0_fop_reply_alloc(struct m0_fop *req, struct m0_fop_type *rept)
Definition: fop.c:128
static enum m0_cas_opcode m0_cas_opcode(const struct m0_fop *fop)
Definition: service.c:1774
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
#define m0_forall(var, nr,...)
Definition: misc.h:112
struct m0_long_lock_addb2 cf_lock_addb2
Definition: service.c:362
Definition: fom.h:481
M0_INTERNAL struct m0_reqh_service_type m0_cas_service_type
Definition: service.c:2935
m0_cas_type
Definition: cas.h:362
static void cas_fom_fini(struct m0_fom *fom0)
Definition: service.c:1723
struct m0_dtm0_tx_pa * dtp_pa
Definition: tx_desc.h:117
static bool op_is_index_drop(enum m0_cas_opcode opc, enum m0_cas_type ct)
Definition: service.c:1129
static uint64_t cas_out_nr(const struct m0_fop *fop)
Definition: service.c:2018
M0_INTERNAL bool m0_dtm0_tx_desc_is_none(const struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:44
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
Definition: cas.h:354
struct m0_reqh reqh
Definition: rm_foms.c:48
const char * sd_name
Definition: sm.h:383
static int cas_done(struct cas_fom *fom, struct m0_cas_op *op, struct m0_cas_rep *rep, enum m0_cas_opcode opc)
Definition: service.c:2435
#define CID_IMASK_PTR(cid)
Definition: service.c:425
M0_INTERNAL void m0_ctg_cursor_kv_get(struct m0_ctg_op *ctg_op, struct m0_buf *key, struct m0_buf *val)
Definition: ctg_store.c:1708
static int cas_ctidx_lookup(struct cas_fom *fom, const struct m0_cas_id *in_cid, int next)
Definition: service.c:2342
int(* rsto_service_allocate)(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: reqh_service.h:436
#define M0_BUF_INIT_PTR(p)
Definition: buf.h:69
static int cas_kv_load_done(struct cas_fom *fom, enum m0_cas_opcode opc, const struct m0_cas_op *op, int phase)
Definition: service.c:2141
struct m0_cas_ctg ** cf_moved_ctgs
Definition: service.c:358
static int cas_fom_tick(struct m0_fom *fom0)
Definition: service.c:1160
struct m0_sm_state_descr * scf_state
Definition: sm.h:356
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
Definition: ext.h:37
Definition: fid.h:38
m0_fom_phase_outcome
Definition: fom.h:625
const struct m0_reqh_service_type * rs_type
Definition: reqh_service.h:228
struct m0_sm_trans_descr cas_fom_trans[]
Definition: service.c:2829
M0_INTERNAL void(* cas__ut_cb_done)(struct m0_fom *fom)
Definition: service.c:1720
static int cas_sdev_state(struct m0_poolmach *pm, uint32_t sdev_idx, enum m0_pool_nd_state *state_out)
Definition: service.c:1783
struct m0_be_tx tx_betx
Definition: dtm.h:559
#define M0_IS0(obj)
Definition: misc.h:70
struct m0_reqh_service * fo_service
Definition: fom.h:505
M0_INTERNAL void m0_fop_release(struct m0_ref *ref)
Definition: fop.c:147
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_chan sm_chan
Definition: sm.h:331
static void cas_service_stop(struct m0_reqh_service *service)
Definition: service.c:589
M0_DTPS_PERSISTENT
Definition: tx_desc.h:171
Definition: cas.h:352
m0_dtm0_tx_pa_state
Definition: tx_desc.h:101
M0_INTERNAL void m0_be_dtm0_log_credit(enum m0_be_dtm0_log_credit_op op, struct m0_dtm0_tx_desc *txd, struct m0_buf *payload, struct m0_be_seg *seg, struct m0_dtm0_log_rec *rec, struct m0_be_tx_credit *accum)
Definition: dtm0_log.c:202
M0_INTERNAL int m0_xcode_obj_dec_from_buf(struct m0_xcode_obj *obj, void *buf, m0_bcount_t len)
Definition: xcode.c:850
M0_INTERNAL uint64_t m0_rpc_at_len(const struct m0_rpc_at_buf *ab)
Definition: at.c:709
struct m0_rpc_session * ri_session
Definition: item.h:147
static int cas_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom, int next_phase)
Definition: service.c:705
static struct m0_net_test_service svc
Definition: service.c:34
M0_INTERNAL int m0_fom_rc(const struct m0_fom *fom)
Definition: fom.c:1727
M0_INTERNAL void m0_ctg_ctidx_delete_credits(struct m0_cas_id *cid, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1971
struct m0_sm_trans_descr m0_generic_phases_trans[]
Definition: fom_generic.c:765
M0_INTERNAL int m0_ctg_meta_delete(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1392
#define _0C(exp)
Definition: assert.h:311
int cf_thrall_rc
Definition: service.c:360
static void cas_prep(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, struct m0_cas_ctg *ctg, uint64_t rec_pos, struct m0_be_tx_credit *accum)
Definition: service.c:2079
M0_INTERNAL void m0_rpc_at_fini(struct m0_rpc_at_buf *ab)
Definition: at.c:441
struct m0_rpc_at_buf cr_key
Definition: cas.h:291
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:198
static const struct m0_fom_ops cas_fom_ops
Definition: service.c:510
static struct m0_fop * fop
Definition: item.c:57
struct m0_buf cf_out_key
Definition: service.c:368
struct m0_long_lock_link cf_lock
Definition: service.c:327
static int cas_op_recs_check(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, struct m0_cas_op *op)
Definition: service.c:907
static bool cas_key_need_to_send(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, struct m0_cas_op *op, uint64_t rec_pos)
Definition: service.c:939
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
Definition: fom.c:624
struct m0_dtm0_tx_participants dtd_ps
Definition: tx_desc.h:122
static int cas_exec(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, struct m0_cas_ctg *ctg, uint64_t rec_pos, int next)
Definition: service.c:2161
M0_INTERNAL struct m0_cas_ctg * m0_ctg_ctidx(void)
Definition: ctg_store.c:2135
#define DTID0_P(__tid)
Definition: tx_desc.h:99
struct m0_sm fo_sm_phase
Definition: fom.h:522
static bool cas_is_valid(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, const struct m0_cas_rec *rec, uint64_t rec_pos)
Definition: service.c:1924
struct m0_fop * fo_rep_fop
Definition: fom.h:492
stats_kv
Definition: service.c:292
#define M0_XCODE_OBJ(type, ptr)
Definition: xcode.h:962
M0_INTERNAL struct m0_fop_type cas_put_fopt
Definition: cas.c:49
static int cas_device_check(const struct cas_fom *fom, const struct m0_cas_id *cid)
Definition: service.c:1811
M0_INTERNAL void m0_ctg_op_fini(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1788
static int cas_ctg_crow_fop_buf_prepare(const struct m0_cas_id *cid, struct m0_rpc_at_buf *at_buf)
Definition: service.c:2495
static int cas_dtm0_logrec_credit_add(struct m0_fom *fom0)
Definition: service.c:1023
static struct m0_cas_rec repv[N]
Definition: service_ut.c:65
static m0_bcount_t cas_rpc_cutoff(const struct cas_fom *fom)
Definition: service.c:2395
M0_INTERNAL void m0_cas_gc_start(struct m0_reqh_service *service)
Definition: index_gc.c:549
uint64_t cv_nr
Definition: cas.h:284
static struct m0_cas_op * cas_op(const struct m0_fom *fom)
Definition: service.c:1769
M0_INTERNAL void m0_long_read_unlock(struct m0_long_lock *lock, struct m0_long_lock_link *link)
Definition: nucleus.c:42
#define out(...)
Definition: gen.c:41
M0_INTERNAL bool m0_long_read_lock(struct m0_long_lock *lk, struct m0_long_lock_link *link, int next_phase)
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
bool cf_startkey_excluded
Definition: service.c:338
static bool cas_fid_is_cctg(const struct m0_fid *fid)
Definition: service.c:2042
M0_INTERNAL void m0_sm_conf_fini(struct m0_sm_conf *conf)
Definition: sm.c:376
M0_INTERNAL void * m0_alloc_aligned(size_t size, unsigned shift)
Definition: memory.c:168
static bool cas_ctidx_op_needed(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, uint64_t rec_pos)
Definition: service.c:2251
M0_INTERNAL int m0_dtm0_logrec_update(struct m0_be_dtm0_log *log, struct m0_be_tx *tx, struct m0_dtm0_tx_desc *txd, struct m0_buf *payload)
Definition: fop.c:286
M0_INTERNAL bool m0_fid_is_valid(const struct m0_fid *fid)
Definition: fid.c:96
#define DTID0_F
Definition: tx_desc.h:98
struct m0_long_lock_addb2 cf_meta_addb2
Definition: service.c:363
static void cas_service_prepare_to_stop(struct m0_reqh_service *svc)
Definition: service.c:583
bool cf_op_checked
Definition: service.c:336
struct m0_cas_recv cgr_rep
Definition: cas.h:304
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
Definition: sm.c:1021
struct m0_reqh * rs_reqh
Definition: reqh_service.h:260
static int cas_service_start(struct m0_reqh_service *service)
Definition: service.c:560
void m0_free(void *data)
Definition: memory.c:146
static void addb2_add_kv_attrs(const struct cas_fom *fom, enum stats_kv_io kv_io)
Definition: service.c:1081
uint64_t cr_rc
Definition: cas.h:221
struct m0_long_lock_link cf_meta
Definition: service.c:328
struct m0_rpc_item f_item
Definition: fop.h:84
const struct m0_reqh_service_type * ft_rstype
Definition: fom.h:617
Definition: cas.h:107
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:490
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
Definition: long_lock_ut.c:46
static void cas_incoming_kv(const struct cas_fom *fom, uint64_t rec_pos, struct m0_buf *key, struct m0_buf *val)
Definition: service.c:742
struct m0_pdclust_src_addr src
Definition: fd.c:108
uint32_t cg_flags
Definition: cas.h:302
struct m0_reqh_service dos_generic
Definition: service.h:45
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_ctg_meta_cursor_get(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1729
int(* rso_start_async)(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.h:342
#define ARRAY_SIZE(a)
Definition: misc.h:45
static int cas_ctg_crow_handle(struct cas_fom *fom, const struct m0_cas_id *cid)
Definition: service.c:2613
Definition: cas.h:363
M0_INTERNAL void(* cas__ut_cb_fini)(struct m0_fom *fom)
Definition: service.c:1721
Definition: fop.h:80
M0_INTERNAL int m0_ctg_op_rc(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1779
static int cas_ctidx_mem_place(struct cas_fom *fom, const struct m0_cas_id *in_cid, int next)
Definition: service.c:2271
struct m0_cas_id cg_id
Definition: cas.h:378
M0_INTERNAL struct m0_long_lock * m0_ctg_del_lock(void)
Definition: ctg_store.c:2157
M0_INTERNAL struct m0_reqh * m0_fom_reqh(const struct m0_fom *fom)
Definition: fom.c:283
Definition: trace.h:478
uint64_t cf_ipos
Definition: service.c:323
uint32_t ab_type
Definition: at.h:251
M0_INTERNAL struct m0_fop_type cas_rep_fopt
Definition: cas.c:52
Definition: tx.h:280
Definition: idx_mock.c:47
static size_t cas_fom_home_locality(const struct m0_fom *fom)
Definition: service.c:1762
M0_INTERNAL const char * m0_fom_phase_name(const struct m0_fom *fom, int phase)
Definition: fom.c:1722
M0_INTERNAL struct m0_cas_ctg * m0_ctg_meta_lookup_result(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1377
static void cas_fom_failure(struct cas_fom *fom, int rc, bool ctg_op_fini)
Definition: service.c:996
#define M0_IMPOSSIBLE(fmt,...)
static int cas_val_send(struct cas_fom *fom, const struct m0_cas_op *op, enum m0_cas_opcode opc, const struct m0_cas_rep *rep, enum cas_fom_phase next_phase)
Definition: service.c:884
static struct m0_sm_state_descr cas_fom_phases[]
Definition: service.c:513