Motr  M0
client.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2016-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CAS
24 
25 #include "lib/trace.h"
26 #include "lib/vec.h"
27 #include "lib/misc.h" /* M0_IN */
28 #include "lib/memory.h"
29 #include "sm/sm.h"
30 #include "fid/fid.h" /* m0_fid */
31 #include "rpc/item.h"
32 #include "rpc/rpc.h" /* m0_rpc_post */
33 #include "rpc/session.h" /* m0_rpc_session */
34 #include "rpc/conn.h" /* m0_rpc_conn */
35 #include "fop/fop.h"
36 #include "fop/fom_generic.h"
37 #include "cas/cas.h"
38 #include "cas/cas_xc.h"
39 #include "cas/client.h"
40 #include "lib/finject.h"
41 #include "cas/cas_addb2.h"
42 #include "dtm0/dtx.h" /* struct m0_dtm0_dtx */
43 #include "dix/layout.h" /* m0_dix_ldesc_copy() */
44 
61 struct creq_niter {
65 
69  uint64_t cni_req_i;
70  uint64_t cni_rep_i;
71  uint64_t cni_kpos;
72 };
73 
74 #define CASREQ_FOP_DATA(fop) ((struct m0_cas_op *)m0_fop_data(fop))
75 
76 static void cas_req_replied_cb(struct m0_rpc_item *item);
77 
78 static const struct m0_rpc_item_ops cas_item_ops = {
79  .rio_sent = NULL,
80  .rio_replied = cas_req_replied_cb
81 };
82 
83 static void creq_asmbl_replied_cb(struct m0_rpc_item *item);
84 
85 static const struct m0_rpc_item_ops asmbl_item_ops = {
86  .rio_sent = NULL,
87  .rio_replied = creq_asmbl_replied_cb
88 };
89 
90 static struct m0_sm_state_descr cas_req_states[] = {
91  [CASREQ_INIT] = {
93  .sd_name = "init",
94  .sd_allowed = M0_BITS(CASREQ_SENT, CASREQ_FRAGM_SENT)
95  },
96  [CASREQ_SENT] = {
97  .sd_name = "request-sent",
98  .sd_allowed = M0_BITS(CASREQ_FINAL, CASREQ_FAILURE,
100  },
101  [CASREQ_FRAGM_SENT] = {
102  .sd_name = "request-fragment-sent",
103  .sd_allowed = M0_BITS(CASREQ_FINAL, CASREQ_FAILURE,
105  },
106  [CASREQ_ASSEMBLY] = {
107  .sd_name = "assembly",
108  .sd_allowed = M0_BITS(CASREQ_FRAGM_SENT, CASREQ_FINAL,
110  },
111  [CASREQ_FINAL] = {
112  .sd_name = "final",
113  .sd_flags = M0_SDF_TERMINAL,
114  },
115  [CASREQ_FAILURE] = {
116  .sd_name = "failure",
117  .sd_flags = M0_SDF_TERMINAL | M0_SDF_FAILURE
118  }
119 };
120 
121 static struct m0_sm_trans_descr cas_req_trans[] = {
122  { "send-over-rpc", CASREQ_INIT, CASREQ_SENT },
123  { "send-fragm-over-rpc", CASREQ_INIT, CASREQ_FRAGM_SENT },
124  { "rpc-failure", CASREQ_SENT, CASREQ_FAILURE },
125  { "assembly", CASREQ_SENT, CASREQ_ASSEMBLY },
126  { "req-processed", CASREQ_SENT, CASREQ_FINAL },
127  { "rpc-failure", CASREQ_FRAGM_SENT, CASREQ_FAILURE },
128  { "fragm-assembly", CASREQ_FRAGM_SENT, CASREQ_ASSEMBLY },
129  { "req-processed", CASREQ_FRAGM_SENT, CASREQ_FINAL },
130  { "assembly-fail", CASREQ_ASSEMBLY, CASREQ_FAILURE },
131  { "assembly-done", CASREQ_ASSEMBLY, CASREQ_FINAL },
132  { "assembly-fragm", CASREQ_ASSEMBLY, CASREQ_FRAGM_SENT },
133 };
134 
136  .scf_name = "cas_req",
137  .scf_nr_states = ARRAY_SIZE(cas_req_states),
138  .scf_state = cas_req_states,
139  .scf_trans_nr = ARRAY_SIZE(cas_req_trans),
140  .scf_trans = cas_req_trans
141 };
142 
143 static int cas_req_fragmentation(struct m0_cas_req *req);
144 static int cas_req_fragment_continue(struct m0_cas_req *req,
145  struct m0_cas_op *op);
146 static void creq_recv_fini(struct m0_cas_recv *recv, bool op_is_meta);
147 
148 static void cas_to_rpc_map(const struct m0_cas_req *creq,
149  const struct m0_rpc_item *item)
150 {
151  uint64_t cid = m0_sm_id_get(&creq->ccr_sm);
152  uint64_t iid = m0_sm_id_get(&item->ri_sm);
153  M0_ADDB2_ADD(M0_AVI_CAS_TO_RPC, cid, iid);
154 }
155 
156 
157 static bool fid_is_meta(struct m0_fid *fid)
158 {
159  M0_PRE(fid != NULL);
160  return m0_fid_eq(fid, &m0_cas_meta_fid);
161 }
162 
163 static int creq_op_alloc(uint64_t recs_nr,
164  struct m0_cas_op **out)
165 {
166  struct m0_cas_op *op;
167  struct m0_cas_rec *rec;
168 
169  M0_PRE(recs_nr > 0);
170  if (M0_FI_ENABLED("cas_alloc_fail"))
171  return M0_ERR(-ENOMEM);
172 
173  M0_ALLOC_PTR(op);
174  M0_ALLOC_ARR(rec, recs_nr);
175  if (op == NULL || rec == NULL) {
176  m0_free(op);
177  m0_free(rec);
178  return M0_ERR(-ENOMEM);
179  } else {
180  op->cg_rec.cr_nr = recs_nr;
181  op->cg_rec.cr_rec = rec;
182  m0_dtm0_tx_desc_init_none(&op->cg_txd);
183  *out = op;
184  }
185  return M0_RC(0);
186 }
187 
188 static void creq_op_free(struct m0_cas_op *op)
189 {
190  if (op != NULL) {
191  m0_dtm0_tx_desc_fini(&op->cg_txd);
192  m0_cas_id_fini(&op->cg_id);
193  m0_free(op->cg_rec.cr_rec);
194  m0_free(op);
195  }
196 }
197 
198 M0_INTERNAL void m0_cas_req_init(struct m0_cas_req *req,
199  struct m0_rpc_session *sess,
200  struct m0_sm_group *grp)
201 {
202  M0_ENTRY();
203  M0_PRE(sess != NULL);
204  M0_PRE(M0_IS0(req));
205  req->ccr_sess = sess;
207  m0_sm_addb2_counter_init(&req->ccr_sm);
208  M0_LEAVE();
209 }
210 
211 static struct m0_rpc_conn *creq_rpc_conn(const struct m0_cas_req *req)
212 {
213  return req->ccr_sess->s_conn;
214 }
215 
216 static struct m0_rpc_machine *creq_rpc_mach(const struct m0_cas_req *req)
217 {
219 }
220 
221 static struct m0_sm_group *cas_req_smgrp(const struct m0_cas_req *req)
222 {
223  return req->ccr_sm.sm_grp;
224 }
225 
226 M0_INTERNAL void m0_cas_req_lock(struct m0_cas_req *req)
227 {
228  M0_ENTRY();
230 }
231 
232 M0_INTERNAL void m0_cas_req_unlock(struct m0_cas_req *req)
233 {
234  M0_ENTRY();
236 }
237 
238 M0_INTERNAL bool m0_cas_req_is_locked(const struct m0_cas_req *req)
239 {
241 }
242 
243 static void cas_req_state_set(struct m0_cas_req *req,
244  enum m0_cas_req_state state)
245 {
246  M0_LOG(M0_DEBUG, "CAS req: %p, state change:[%s -> %s]\n",
247  req, m0_sm_state_name(&req->ccr_sm, req->ccr_sm.sm_state),
248  m0_sm_state_name(&req->ccr_sm, state));
249  m0_sm_state_set(&req->ccr_sm, state);
250 }
251 
252 static void cas_req_reply_fini(struct m0_cas_req *req)
253 {
254  struct m0_cas_recv *recv = &req->ccr_reply.cgr_rep;
255  uint64_t i;
256 
257  for (i = 0; i < recv->cr_nr; i++) {
258  m0_rpc_at_fini(&recv->cr_rec[i].cr_key);
259  m0_rpc_at_fini(&recv->cr_rec[i].cr_val);
260  }
261  m0_free(req->ccr_reply.cgr_rep.cr_rec);
262 }
263 
264 static void cas_req_fini(struct m0_cas_req *req)
265 {
266  uint32_t cur_state = req->ccr_sm.sm_state;
267 
268  M0_ENTRY();
270  M0_PRE(M0_IN(cur_state, (CASREQ_INIT, CASREQ_FINAL, CASREQ_FAILURE)));
271  if (cur_state == CASREQ_FAILURE) {
272  if (req->ccr_reply_item != NULL)
273  m0_rpc_item_put_lock(req->ccr_reply_item);
274  if (req->ccr_fop != NULL) {
275  req->ccr_fop->f_data.fd_data = NULL;
276  m0_fop_put_lock(req->ccr_fop);
277  }
278  }
279  if (req->ccr_req_op != NULL) {
280  /* Restore records vector for proper freeing. */
281  req->ccr_req_op->cg_rec = req->ccr_rec_orig;
282  creq_recv_fini(&req->ccr_rec_orig, req->ccr_is_meta);
283  creq_op_free(req->ccr_req_op);
284  }
286  m0_free(req->ccr_asmbl_ikeys);
287  m0_sm_fini(&req->ccr_sm);
288  M0_LEAVE();
289 }
290 
291 M0_INTERNAL void m0_cas_req_fini(struct m0_cas_req *req)
292 {
294  cas_req_fini(req);
295  M0_SET0(req);
296 }
297 
298 M0_INTERNAL void m0_cas_req_fini_lock(struct m0_cas_req *req)
299 {
300  M0_ENTRY();
302  cas_req_fini(req);
304  M0_SET0(req);
305  M0_LEAVE();
306 }
307 
313 static void creq_kv_hold_down(struct m0_cas_rec *rec)
314 {
315  struct m0_rpc_at_buf *key = &rec->cr_key;
316  struct m0_rpc_at_buf *val = &rec->cr_val;
317 
318  M0_ENTRY();
320  M0_IN(key->ab_type, (M0_RPC_AT_INLINE, M0_RPC_AT_BULK_SEND)));
321  if (m0_rpc_at_is_set(key))
323  if (m0_rpc_at_is_set(val))
325  M0_LEAVE();
326 }
327 
332 static void creq_recv_fini(struct m0_cas_recv *recv, bool op_is_meta)
333 {
334  struct m0_cas_rec *rec;
335  struct m0_cas_kv *kv;
336  uint64_t i;
337  uint64_t k;
338 
339  for (i = 0; i < recv->cr_nr; i++) {
340  rec = &recv->cr_rec[i];
341  /*
342  * CAS client does not copy keys/values provided by user if
343  * it works with non-meta index, otherwise it encodes keys
344  * and places them in buffers allocated by itself.
345  * Save keys/values in memory in the first case, free in
346  * the second.
347  */
348  if (!op_is_meta)
349  creq_kv_hold_down(rec);
350  m0_rpc_at_fini(&rec->cr_key);
351  m0_rpc_at_fini(&rec->cr_val);
352  for (k = 0; k < rec->cr_kv_bufs.cv_nr; k++) {
353  kv = &rec->cr_kv_bufs.cv_rec[k];
354  m0_rpc_at_fini(&kv->ck_key);
355  m0_rpc_at_fini(&kv->ck_val);
356  }
357  }
358 
359 }
360 
361 static void creq_fop_destroy(struct m0_cas_req *req)
362 {
363  struct m0_fop *fop = req->ccr_fop;
364 
365  fop->f_data.fd_data = NULL;
366  m0_fop_fini(fop);
367  m0_free(fop);
368  req->ccr_fop = NULL;
369 }
370 
371 static void creq_fop_release(struct m0_ref *ref)
372 {
373  struct m0_fop *fop;
374 
375  M0_ENTRY();
376  M0_PRE(ref != NULL);
377  fop = container_of(ref, struct m0_fop, f_ref);
378  m0_fop_fini(fop);
379  m0_free(fop);
380  M0_LEAVE();
381 }
382 
383 static void creq_asmbl_fop_release(struct m0_ref *ref)
384 {
385  struct m0_fop *fop;
386  struct m0_cas_op *op;
387 
388  M0_ENTRY();
389  M0_PRE(ref != NULL);
390  fop = container_of(ref, struct m0_fop, f_ref);
392  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
393  m0_fop_fini(fop);
394  M0_LEAVE();
395 }
396 
397 static int creq_fop_create(struct m0_cas_req *req,
398  struct m0_fop_type *ftype,
399  struct m0_cas_op *op)
400 {
401  struct m0_fop *fop;
402  M0_ENTRY();
403 
404  M0_ALLOC_PTR(fop);
405  if (fop == NULL)
406  return M0_ERR(-ENOMEM);
407 
408  m0_fop_init(fop, ftype, (void *)op, creq_fop_release);
409  fop->f_opaque = req;
410  req->ccr_fop = fop;
411  req->ccr_ftype = ftype;
412 
413  M0_LEAVE("cas_req=%p fop=%p", req, fop);
414  return 0;
415 }
416 
418  struct m0_fop_type *ftype,
419  struct m0_cas_op *op,
420  enum m0_cas_req_state *next_state)
421 {
422  int rc;
423  M0_ENTRY();
424 
425  rc = creq_fop_create(req, ftype, op);
426  if (rc == 0) {
427  *next_state = CASREQ_SENT;
428  /*
429  * Check whether original fop payload does not exceed
430  * max rpc item payload.
431  */
433  &req->ccr_fop->f_item,
434  req->ccr_sess)) {
435  *next_state = CASREQ_FRAGM_SENT;
437  }
438  if (rc != 0)
440  }
441  return M0_RC(rc);
442 }
443 
445 {
446  struct m0_fop *fop = M0_AMB(fop, item, f_item);
447  return (struct m0_cas_req *)fop->f_opaque;
448 }
449 
450 static struct m0_rpc_item *cas_req_to_item(const struct m0_cas_req *req)
451 {
452  return &req->ccr_fop->f_item;
453 }
454 
455 static struct m0_cas_rep *cas_rep(struct m0_rpc_item *reply)
456 {
458 }
459 
460 M0_INTERNAL int m0_cas_req_generic_rc(const struct m0_cas_req *req)
461 {
462  struct m0_fop *req_fop = req->ccr_fop;
463  struct m0_rpc_item *reply;
464  int rc;
465 
466  M0_PRE(M0_IN(req->ccr_sm.sm_state, (CASREQ_FINAL, CASREQ_FAILURE)));
467 
468  reply = req_fop != NULL ? cas_req_to_item(req)->ri_reply : NULL;
469 
470  rc = req->ccr_sm.sm_rc;
471  if (rc == 0 && reply != NULL)
473  if (rc == 0)
474  rc = req->ccr_reply.cgr_rc;
475 
476  return M0_RC(rc);
477 }
478 
480  struct m0_fid *idx_fid)
481 {
482  return m0_rpc_at_is_set(val) == !fid_is_meta(idx_fid);
483 }
484 
485 static int cas_rep__validate(const struct m0_fop_type *ftype,
486  struct m0_cas_op *op,
487  struct m0_cas_rep *rep)
488 {
489  struct m0_cas_rec *rec;
490  uint64_t sum;
491  uint64_t i;
492 
493  if (ftype == &cas_cur_fopt) {
494  sum = 0;
495  for (i = 0; i < op->cg_rec.cr_nr; i++)
496  sum += op->cg_rec.cr_rec[i].cr_rc;
497  if (rep->cgr_rep.cr_nr > sum)
498  return M0_ERR(-EPROTO);
499  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
500  rec = &rep->cgr_rep.cr_rec[i];
501  if ((int32_t)rec->cr_rc > 0 &&
502  (!m0_rpc_at_is_set(&rec->cr_key) ||
504  &op->cg_id.ci_fid)))
505  rec->cr_rc = M0_ERR(-EPROTO);
506  }
507  } else {
508  M0_ASSERT(M0_IN(ftype, (&cas_get_fopt, &cas_put_fopt,
509  &cas_del_fopt)));
510  /*
511  * CAS service guarantees equal number of records in request and
512  * response for GET,PUT, DEL operations. Otherwise, it's not
513  * possible to match requested records with the ones in reply,
514  * because keys in reply are absent.
515  */
516  if (op->cg_rec.cr_nr != rep->cgr_rep.cr_nr)
517  return M0_ERR(-EPROTO);
518  /*
519  * Successful GET reply for ordinary index should always contain
520  * non-empty value.
521  */
522  if (ftype == &cas_get_fopt)
523  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
524  rec = &rep->cgr_rep.cr_rec[i];
525  if (rec->cr_rc == 0 &&
527  &op->cg_id.ci_fid))
528  rec->cr_rc = M0_ERR(-EPROTO);
529  }
530  }
531  return M0_RC(0);
532 }
533 
534 static int cas_rep_validate(const struct m0_cas_req *req)
535 {
536  const struct m0_fop *rfop = req->ccr_fop;
537  struct m0_cas_op *op = m0_fop_data(rfop);
538  struct m0_cas_rep *rep = cas_rep(cas_req_to_item(req)->ri_reply);
539 
540  return rep->cgr_rc ?: cas_rep__validate(rfop->f_type, op, rep);
541 }
542 
543 static void cas_req_failure(struct m0_cas_req *req, int32_t rc)
544 {
545  M0_PRE(rc != 0);
546  m0_sm_fail(&req->ccr_sm, CASREQ_FAILURE, rc);
547 }
548 
549 static void cas_req_failure_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
550 {
551  struct m0_cas_req *req = container_of(ast, struct m0_cas_req,
553  int32_t rc = (long)ast->sa_datum;
554 
555  M0_PRE(rc != 0);
557 }
558 
559 static void cas_req_failure_ast_post(struct m0_cas_req *req, int32_t rc)
560 {
561  M0_ENTRY();
562  req->ccr_failure_ast.sa_cb = cas_req_failure_ast;
563  req->ccr_failure_ast.sa_datum = (void *)(long)rc;
564  m0_sm_ast_post(cas_req_smgrp(req), &req->ccr_failure_ast);
565  M0_LEAVE();
566 }
567 
568 static void creq_item_prepare(const struct m0_cas_req *req,
569  struct m0_rpc_item *item,
570  const struct m0_rpc_item_ops *ops)
571 {
573  item->ri_ops = ops;
574  item->ri_session = req->ccr_sess;
577 }
578 
579 static void cas_fop_send(struct m0_cas_req *req)
580 {
581  struct m0_cas_op *op = m0_fop_data(req->ccr_fop);
582  struct m0_rpc_item *item;
583  int rc;
584 
585  M0_ENTRY();
587  req->ccr_sent_recs_nr += op->cg_rec.cr_nr;
590  rc = m0_rpc_post(item);
592  M0_LOG(M0_NOTICE, "RPC post returned %d", rc);
593 }
594 
595 static int creq_kv_buf_add(const struct m0_cas_req *req,
596  const struct m0_bufvec *kv,
597  uint32_t idx,
598  struct m0_rpc_at_buf *buf)
599 {
600  M0_PRE(req != NULL);
601  M0_PRE(kv != NULL);
602  M0_PRE(buf != NULL);
603  M0_PRE(idx < kv->ov_vec.v_nr);
604 
605  return m0_rpc_at_add(buf, &M0_BUF_INIT(kv->ov_vec.v_count[idx],
606  kv->ov_buf[idx]),
607  creq_rpc_conn(req));
608 }
609 
610 static void creq_asmbl_fop_init(struct m0_cas_req *req,
611  struct m0_fop_type *ftype,
612  struct m0_cas_op *op)
613 {
614  m0_fop_init(&req->ccr_asmbl_fop, ftype, (void *)op,
616 }
617 
626 static int greq_asmbl_add(struct m0_cas_req *req,
627  struct m0_cas_rec *rec,
628  uint64_t idx,
629  uint64_t orig_idx,
630  uint64_t vlen)
631 {
632  int rc;
633 
634  m0_rpc_at_init(&rec->cr_key);
635  m0_rpc_at_init(&rec->cr_val);
636  rc = creq_kv_buf_add(req, req->ccr_keys, orig_idx, &rec->cr_key) ?:
637  m0_rpc_at_recv(&rec->cr_val, creq_rpc_conn(req), vlen, true);
638  if (rc == 0) {
639  req->ccr_asmbl_ikeys[idx] = orig_idx;
640  } else {
641  m0_rpc_at_detach(&rec->cr_key);
642  m0_rpc_at_fini(&rec->cr_key);
643  m0_rpc_at_fini(&rec->cr_val);
644  }
645  return M0_RC(rc);
646 }
647 
651 static uint64_t greq_asmbl_count(const struct m0_cas_req *req)
652 {
653  const struct m0_fop *rfop = req->ccr_fop;
654  struct m0_cas_op *req_op = m0_fop_data(rfop);
655  struct m0_cas_rep *rep = cas_rep(cas_req_to_item(req)->ri_reply);
656  struct m0_cas_rec *rcvd;
657  struct m0_cas_rec *sent;
658  struct m0_buf buf;
659  uint64_t len;
660  uint64_t ret = 0;
661  uint64_t i;
662  int rc;
663 
664  M0_PRE(rfop->f_type == &cas_get_fopt);
665 
666  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
667  rcvd = &rep->cgr_rep.cr_rec[i];
668  sent = &req_op->cg_rec.cr_rec[i];
669  rc = m0_rpc_at_rep_get(&sent->cr_val, &rcvd->cr_val, &buf);
670  if (rc != 0 && m0_rpc_at_rep_is_bulk(&rcvd->cr_val, &len))
671  ret++;
672  }
673  return ret;
674 }
675 
676 static int greq_asmbl_fill(struct m0_cas_req *req, struct m0_cas_op *op)
677 {
678  const struct m0_fop *rfop = req->ccr_fop;
679  struct m0_cas_op *req_op = m0_fop_data(rfop);
680  struct m0_cas_rep *rep = cas_rep(cas_req_to_item(req)->ri_reply);
681  struct m0_cas_rec *rcvd;
682  struct m0_cas_rec *sent;
683  struct m0_buf buf;
684  struct m0_cas_recv *recv;
685  uint64_t len;
686  uint64_t i;
687  uint64_t k = 0;
688  int rc = 0;
689 
690  M0_PRE(op != NULL);
691  M0_PRE(rfop->f_type == &cas_get_fopt);
692 
693  recv = &op->cg_rec;
694  op->cg_id = req_op->cg_id;
695 
696  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
697  rcvd = &rep->cgr_rep.cr_rec[i];
698  sent = &req_op->cg_rec.cr_rec[i];
699  rc = m0_rpc_at_rep_get(&sent->cr_val, &rcvd->cr_val, &buf);
700  if (rc != 0 && m0_rpc_at_rep_is_bulk(&rcvd->cr_val, &len)) {
701  M0_PRE(k < recv->cr_nr);
702  rc = greq_asmbl_add(req, &recv->cr_rec[k], k, i, len);
703  if (rc != 0)
704  goto err;
705  k++;
706  }
707  }
708 
709 err:
710  if (rc != 0) {
711  /* Finalise all already initialised records. */
712  recv->cr_nr = k;
713  creq_recv_fini(recv, fid_is_meta(&op->cg_id.ci_fid));
714  }
715  return M0_RC(rc);
716 }
717 
718 static bool greq_asmbl_post(struct m0_cas_req *req)
719 {
720  struct m0_cas_op *op = NULL;
721  struct m0_rpc_item *item = &req->ccr_asmbl_fop.f_item;
722  uint64_t asmbl_count;
723  bool ret = false;
724  int rc = 0;
725 
726  asmbl_count = greq_asmbl_count(req);
727  if (asmbl_count > 0) {
728  M0_ALLOC_ARR(req->ccr_asmbl_ikeys, asmbl_count);
729  if (req->ccr_asmbl_ikeys == NULL) {
730  rc = M0_ERR(-ENOMEM);
731  goto err;
732  }
733  rc = creq_op_alloc(asmbl_count, &op) ?:
735  if (rc == 0) {
738  rc = m0_rpc_post(item);
740  if (rc != 0)
741  m0_fop_put_lock(&req->ccr_asmbl_fop);
742  else
743  ret = true;
744  }
745  }
746 err:
747  if (rc != 0) {
748  m0_free(req->ccr_asmbl_ikeys);
749  creq_op_free(op);
750  }
751  return ret;
752 }
753 
754 static bool creq_niter_invariant(struct creq_niter *it)
755 {
756  return it->cni_req_i <= it->cni_reqv->cr_nr &&
757  it->cni_rep_i <= it->cni_repv->cr_nr;
758 }
759 
760 static void creq_niter_init(struct creq_niter *it,
761  struct m0_cas_op *op,
762  struct m0_cas_rep *rep)
763 {
764  it->cni_reqv = &op->cg_rec;
765  it->cni_repv = &rep->cgr_rep;
766  it->cni_req_i = 0;
767  it->cni_rep_i = 0;
768  it->cni_kpos = -1;
769  it->cni_req = NULL;
770  it->cni_rep = NULL;
771 }
772 
773 static int creq_niter_next(struct creq_niter *it)
774 {
775  int rc = 0;
776 
778  if (it->cni_rep_i == it->cni_repv->cr_nr)
779  rc = -ENOENT;
780 
781  if (rc == 0) {
782  it->cni_rep = &it->cni_repv->cr_rec[it->cni_rep_i++];
783  it->cni_kpos++;
784  if (it->cni_rep->cr_rc == 1 ||
785  ((int32_t)it->cni_rep->cr_rc <= 0 && it->cni_req_i == 0) ||
786  it->cni_kpos == it->cni_req->cr_rc) {
788  M0_PRE(it->cni_req_i < it->cni_reqv->cr_nr);
789  it->cni_req = &it->cni_reqv->cr_rec[it->cni_req_i];
790  it->cni_req_i++;
791  it->cni_kpos = 0;
792  }
793  }
794 
796  return M0_RC(rc);
797 }
798 
799 static void creq_niter_fini(struct creq_niter *it)
800 {
801  M0_SET0(it);
802 }
803 
808 static int nreq_asmbl_prep(struct m0_cas_req *req, struct m0_cas_op *op)
809 {
810  struct m0_cas_op *orig = m0_fop_data(req->ccr_fop);
811  struct m0_cas_rec *rec;
812  uint64_t i;
813  int rc = 0;
814 
815  M0_PRE(op->cg_rec.cr_nr == orig->cg_rec.cr_nr);
816  op->cg_id = orig->cg_id;
817  for (i = 0; i < orig->cg_rec.cr_nr; i++) {
818  rec = &op->cg_rec.cr_rec[i];
819  M0_ASSERT(M0_IS0(rec));
820  rec->cr_rc = orig->cg_rec.cr_rec[i].cr_rc;
821  m0_rpc_at_init(&rec->cr_key);
822  rc = creq_kv_buf_add(req, req->ccr_keys, i, &rec->cr_key);
823  if (rc != 0)
824  goto err;
825  M0_ALLOC_ARR(rec->cr_kv_bufs.cv_rec, rec->cr_rc);
826  if (op->cg_rec.cr_rec[i].cr_kv_bufs.cv_rec == NULL) {
827  rc = M0_ERR(-ENOMEM);
828  goto err;
829  }
830  }
831 err:
832  if (rc != 0) {
833  /* Finalise all already initialised records. */
834  op->cg_rec.cr_nr = i + 1;
835  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
836  }
837 
838  return M0_RC(rc);
839 }
840 
841 static int nreq_asmbl_fill(struct m0_cas_req *req, struct m0_cas_op *op)
842 {
843  const struct m0_fop *rfop = req->ccr_fop;
844  struct m0_cas_rep *reply = cas_rep(cas_req_to_item(req)->ri_reply);
845  struct m0_cas_rec *rep;
846  struct m0_cas_rec *rec;
847  struct m0_rpc_at_buf *key;
848  struct m0_rpc_at_buf *val;
849  struct creq_niter iter;
850  uint64_t klen;
851  uint64_t vlen;
852  bool bulk_key;
853  bool bulk_val;
854  int rc = 0;
855  uint64_t i;
856 
857  M0_PRE(rfop->f_type == &cas_cur_fopt);
858 
859  rc = nreq_asmbl_prep(req, op);
860  if (rc != 0)
861  return M0_ERR(rc);
862  /*
863  * 'op' is a copy of original request relative to starting keys,
864  * so iterator will iterate over 'op' the same way as with original
865  * request.
866  */
867  creq_niter_init(&iter, op, reply);
868  while (creq_niter_next(&iter) != -ENOENT) {
869  rep = iter.cni_rep;
870  rec = iter.cni_req;
871  i = rec->cr_kv_bufs.cv_nr;
872  bulk_key = m0_rpc_at_rep_is_bulk(&rep->cr_key, &klen);
873  bulk_val = m0_rpc_at_rep_is_bulk(&rep->cr_val, &vlen);
874  key = &rec->cr_kv_bufs.cv_rec[i].ck_key;
875  val = &rec->cr_kv_bufs.cv_rec[i].ck_val;
878  rc = m0_rpc_at_recv(val, creq_rpc_conn(req), vlen, bulk_val) ?:
879  m0_rpc_at_recv(key, creq_rpc_conn(req), klen, bulk_key);
880  if (rc == 0) {
881  rec->cr_kv_bufs.cv_nr++;
882  } else {
885  break;
886  }
887  }
888  creq_niter_fini(&iter);
889 
890  if (rc != 0)
891  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
892  return M0_RC(rc);
893 }
894 
895 static bool nreq_asmbl_post(struct m0_cas_req *req)
896 {
897  const struct m0_fop *rfop = req->ccr_fop;
898  struct m0_cas_op *req_op = m0_fop_data(rfop);
899  struct m0_cas_rep *rep = cas_rep(cas_req_to_item(req)->ri_reply);
900  struct m0_rpc_item *item = &req->ccr_asmbl_fop.f_item;
901  struct m0_cas_rec *rcvd;
902  struct m0_cas_rec *sent;
903  bool bulk = false;
904  struct m0_buf buf;
905  struct m0_cas_op *op;
906  int rc;
907  uint64_t len;
908  struct creq_niter iter;
909 
910  creq_niter_init(&iter, req_op, rep);
911  while ((rc = creq_niter_next(&iter)) != -ENOENT) {
912  rcvd = iter.cni_rep;
913  sent = iter.cni_req;
914  M0_ASSERT(sent->cr_kv_bufs.cv_nr == 0);
915  rc = m0_rpc_at_rep_get(NULL, &rcvd->cr_key, &buf) ?:
916  m0_rpc_at_rep_get(NULL, &rcvd->cr_val, &buf);
917  if (rc != 0 && !bulk)
918  bulk = m0_rpc_at_rep_is_bulk(&rcvd->cr_key, &len) ||
919  m0_rpc_at_rep_is_bulk(&rcvd->cr_val, &len);
920  }
921  creq_niter_fini(&iter);
922 
923  /*
924  * If at least one key/value requires bulk transmission, then
925  * resend the whole request, requesting bulk transmission as necessary.
926  */
927  if (bulk) {
928  rc = creq_op_alloc(req_op->cg_rec.cr_nr, &op);
929  if (rc == 0) {
930  rc = nreq_asmbl_fill(req, op);
931  if (rc == 0) {
934  rc = m0_rpc_post(item);
936  if (rc != 0)
937  m0_fop_put_lock(&req->ccr_asmbl_fop);
938  } else {
939  creq_op_free(op);
940  bulk = false;
941  }
942  }
943  }
944  return bulk;
945 }
946 
947 static void creq_rep_override(struct m0_cas_rec *orig,
948  struct m0_cas_rec *new)
949 {
950  M0_ENTRY();
951  m0_rpc_at_fini(&orig->cr_key);
952  m0_rpc_at_fini(&orig->cr_val);
953  *orig = *new;
954  /*
955  * Key/value data buffers are now attached to both records.
956  * Detach buffers from 'new' record to avoid double free.
957  */
958  m0_rpc_at_detach(&new->cr_key);
959  m0_rpc_at_detach(&new->cr_val);
960  M0_LEAVE();
961 }
962 
963 static void nreq_asmbl_accept(struct m0_cas_req *req)
964 {
965  struct m0_fop *fop = &req->ccr_asmbl_fop;
966  struct m0_rpc_item *item = &fop->f_item;
967  struct m0_cas_rep *crep = cas_rep(cas_req_to_item(req)->ri_reply);
969  item->ri_reply));
970  struct m0_cas_rec *rcvd;
971  struct m0_cas_rec *sent;
972  struct m0_cas_op *op = m0_fop_data(fop);
973  struct m0_cas_kv *kv;
974  struct creq_niter iter;
975  uint64_t i;
976  int rc;
977 
978  i = 0;
979  creq_niter_init(&iter, op, rep);
980  while (creq_niter_next(&iter) != -ENOENT) {
981  rcvd = iter.cni_rep;
982  sent = iter.cni_req;
983  if ((int32_t)rcvd->cr_rc > 0) {
985  M0_PRE(rcvd->cr_rc <= sent->cr_kv_bufs.cv_nr);
986  kv = &sent->cr_kv_bufs.cv_rec[rcvd->cr_rc - 1];
987  rc = m0_rpc_at_rep2inline(&kv->ck_key, &rcvd->cr_key) ?:
988  m0_rpc_at_rep2inline(&kv->ck_val, &rcvd->cr_val);
989  if (rc == 0)
991  rcvd);
992  }
993  i++;
994  }
995  creq_niter_fini(&iter);
996 }
997 
998 static void greq_asmbl_accept(struct m0_cas_req *req)
999 {
1000  struct m0_fop *fop = &req->ccr_asmbl_fop;
1001  struct m0_rpc_item *item = &fop->f_item;
1002  struct m0_cas_rep *crep = cas_rep(cas_req_to_item(req)->ri_reply);
1004  item->ri_reply));
1005  struct m0_cas_rec *rec;
1006  struct m0_cas_op *op = m0_fop_data(fop);
1007  uint64_t i;
1008  uint64_t orig_i;
1009  int rc;
1010 
1011  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
1012  rec = &rep->cgr_rep.cr_rec[i];
1013  if (rec->cr_rc == 0) {
1014  rc = m0_rpc_at_rep2inline(&op->cg_rec.cr_rec[i].cr_val,
1015  &rec->cr_val);
1016  if (rc == 0) {
1017  orig_i = req->ccr_asmbl_ikeys[i];
1018  creq_rep_override(&crep->cgr_rep.cr_rec[orig_i],
1019  rec);
1020  }
1021  }
1022  }
1023 }
1024 
1030 {
1031  struct m0_cas_rep *rep;
1032 
1033  M0_PRE(req != NULL);
1034  M0_PRE(req->ccr_fop != NULL);
1035 
1036  rep = cas_rep(cas_req_to_item(req)->ri_reply);
1037  M0_ASSERT(rep != NULL);
1038 
1039  req->ccr_remid = rep->cgr_mod_rep.fmr_remid;
1040 }
1041 
1043  bool *fragm_continue)
1044 {
1045  struct m0_cas_rep *reply = &req->ccr_reply;
1046  struct m0_cas_rep *rcvd_reply = cas_rep(req->ccr_reply_item);
1047  struct m0_fop *req_fop = req->ccr_fop;
1048  struct m0_cas_op *op = m0_fop_data(req_fop);
1049  uint64_t i;
1050  uint64_t reply_seed;
1051  int rc = 0;
1052 
1053  M0_ASSERT(req_fop->f_type == req->ccr_ftype);
1054  M0_ASSERT(req->ccr_sent_recs_nr <= req->ccr_rec_orig.cr_nr);
1055  M0_ASSERT(reply->cgr_rep.cr_nr + rcvd_reply->cgr_rep.cr_nr <=
1056  req->ccr_max_replies_nr);
1057  *fragm_continue = false;
1058 
1059  /* Copy tx remid before fop and rpc item in `req` are set to NULL*/
1061  /*
1062  * Place reply buffers locally (without copying of actual data), zero
1063  * them in reply fop to avoid their freeing during reply fop destroying.
1064  */
1065  reply_seed = reply->cgr_rep.cr_nr;
1066  for (i = 0; i < rcvd_reply->cgr_rep.cr_nr; i++) {
1067  reply->cgr_rep.cr_rec[reply_seed + i] =
1068  rcvd_reply->cgr_rep.cr_rec[i];
1069  /* Detach buffers to avoid double-freeing of received data. */
1070  creq_kv_hold_down(&rcvd_reply->cgr_rep.cr_rec[i]);
1071  }
1072  reply->cgr_rep.cr_nr += rcvd_reply->cgr_rep.cr_nr;
1073  /* Roger, reply item is not needed anymore. */
1074  m0_rpc_item_put_lock(req->ccr_reply_item);
1075  req->ccr_reply_item = NULL;
1076  /*
1077  * Null fop data pointer to avoid cas_op freeing during fop finalisation
1078  * as cas_op is going to be reused for the rest fragments sending.
1079  */
1080  req_fop->f_data.fd_data = NULL;
1081  m0_fop_put_lock(req_fop);
1082  req->ccr_fop = NULL;
1083  if (req->ccr_sent_recs_nr < req->ccr_rec_orig.cr_nr) {
1084  /* Continue fragmentation. */
1086  if (rc == 0)
1087  *fragm_continue = true;
1088  }
1089  return rc;
1090 }
1091 
1093  struct m0_sm_ast *ast)
1094 {
1095  struct m0_cas_req *req = container_of(ast, struct m0_cas_req,
1096  ccr_replied_ast);
1097  struct m0_fop *fop = &req->ccr_asmbl_fop;
1098  struct m0_rpc_item *item = &fop->f_item;
1099  struct m0_rpc_item *reply = item->ri_reply;
1100  bool fragm_continue;
1101  int rc;
1102 
1103  rc = m0_rpc_item_error(item) ?:
1104  cas_rep(reply)->cgr_rc ?:
1106  if (rc == 0) {
1107  M0_ASSERT(M0_IN(fop->f_type, (&cas_get_fopt, &cas_cur_fopt)));
1108  if (fop->f_type == &cas_get_fopt)
1110  else
1112  }
1113  /*
1114  * On assembly request error, just continue request processing.
1115  * All records that were requested via assembly request already
1116  * have error status code.
1117  */
1118  rc = cas_req_reply_handle(req, &fragm_continue);
1119  if (rc == 0)
1120  cas_req_state_set(req, !fragm_continue ?
1122  else
1124 
1126 }
1127 
1129 {
1131  struct m0_cas_req,
1132  ccr_asmbl_fop);
1133 
1134  M0_ENTRY();
1135  req->ccr_replied_ast.sa_cb = creq_asmbl_replied_ast;
1136  m0_sm_ast_post(cas_req_smgrp(req), &req->ccr_replied_ast);
1137  M0_LEAVE();
1138 }
1139 
1140 static void cas_req_replied_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
1141 {
1142  struct m0_cas_req *req = container_of(ast, struct m0_cas_req,
1143  ccr_replied_ast);
1144  struct m0_fop_type *req_fop_type = req->ccr_fop->f_type;
1145  bool assembly_wait = false;
1146  bool suppress_err_msg;
1147  bool fragm_continue;
1148  int rc;
1149 
1150  req->ccr_reply.cgr_rc = cas_rep(req->ccr_reply_item)->cgr_rc;
1151  rc = cas_rep_validate(req);
1152  if (rc == 0) {
1153  if (M0_IN(req_fop_type, (&cas_cur_fopt, &cas_get_fopt)) &&
1154  !req->ccr_is_meta) {
1155  assembly_wait = (req_fop_type == &cas_get_fopt) ?
1157  if (assembly_wait)
1159  }
1160  if (!assembly_wait) {
1161  rc = cas_req_reply_handle(req, &fragm_continue);
1162  if (rc == 0 && !fragm_continue)
1164  }
1165  }
1166  if (rc != 0) {
1167  /*
1168  * For now CROW flag is commonly used for PUT operations. In
1169  * this case indices are physically created only on the nodes
1170  * where keys are presented. But NEXT queries are sent to all
1171  * the nodes, so some of them may return -ENOENT (index does
1172  * not exist). It is not critical, suppress these errors not
1173  * to irritate the user.
1174  */
1175  suppress_err_msg = !req->ccr_is_meta &&
1176  req_fop_type == &cas_cur_fopt && rc == -ENOENT;
1177  cas_req_failure(req, suppress_err_msg ? rc : M0_ERR(rc));
1178  }
1179 }
1180 
1182 {
1183  struct m0_cas_req *req = item_to_cas_req(item);
1184 
1185  M0_ENTRY();
1186  if (M0_FI_ENABLED("send-failure"))
1187  item->ri_error = -ENOTCONN;
1188  if (item->ri_error == 0) {
1189  M0_ASSERT(item->ri_reply != NULL);
1190  req->ccr_reply_item = item->ri_reply;
1191  /*
1192  * Get additional reference to reply item to copy reply buffers
1193  * in replied ast call.
1194  */
1196  req->ccr_replied_ast.sa_cb = cas_req_replied_ast;
1197  m0_sm_ast_post(cas_req_smgrp(req), &req->ccr_replied_ast);
1198  } else
1200  M0_LEAVE();
1201 }
1202 
1203 static int cas_index_op_prepare(const struct m0_cas_req *req,
1204  const struct m0_cas_id *cids,
1205  uint64_t cids_nr,
1206  bool recv_val,
1207  uint32_t flags,
1208  struct m0_cas_op **out)
1209 {
1210  struct m0_cas_op *op;
1211  struct m0_cas_rec *rec;
1212  int rc;
1213  uint64_t i;
1214 
1215  M0_ENTRY();
1216 
1217  rc = creq_op_alloc(cids_nr, &op);
1218  if (rc != 0)
1219  return M0_ERR(rc);
1220  op->cg_id.ci_fid = m0_cas_meta_fid;
1221  op->cg_flags = flags;
1222  rec = op->cg_rec.cr_rec;
1223  for (i = 0; i < cids_nr; i++) {
1224  struct m0_buf buf;
1225 
1226  m0_rpc_at_init(&rec[i].cr_key);
1227  /* Xcode the key to get continuous buffer for sending. */
1230  /*
1231  * Cast to avoid 'discard const' compile
1232  * error, in fact cas id element is not
1233  * changed during encoding.
1234  */
1235  (struct m0_cas_id *)&cids[i]),
1236  &buf.b_addr, &buf.b_nob);
1237  if (rc == 0) {
1238  rc = m0_rpc_at_add(&rec[i].cr_key,
1239  &buf,
1240  creq_rpc_conn(req));
1241  if (rc != 0)
1242  m0_buf_free(&buf);
1243  else if (recv_val) {
1244  m0_rpc_at_init(&rec[i].cr_val);
1245  rc = m0_rpc_at_recv(&rec[i].cr_val,
1246  creq_rpc_conn(req),
1247  sizeof(struct m0_fid),
1248  false);
1249  }
1250  }
1251  if (rc != 0)
1252  break;
1253  }
1254 
1255  if (rc != 0) {
1256  op->cg_rec.cr_nr = i + 1;
1257  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
1258  creq_op_free(op);
1259  return M0_ERR(rc);
1260  }
1261  *out = op;
1262  return M0_RC(rc);
1263 }
1264 
1265 static void addb2_add_cas_req_attrs(const struct m0_cas_req *req)
1266 {
1267  uint64_t sm_id = m0_sm_id_get(&req->ccr_sm);
1268 
1269  M0_ADDB2_ADD(M0_AVI_ATTR, sm_id,
1270  M0_AVI_CAS_REQ_ATTR_IS_META, !!req->ccr_is_meta);
1271  if (req->ccr_req_op != NULL)
1273  req->ccr_req_op->cg_rec.cr_nr);
1274 }
1275 
1277  const struct m0_cas_id *cids,
1278  uint64_t cids_nr,
1279  uint64_t max_replies_nr,
1280  bool recv_val,
1281  uint32_t flags,
1282  struct m0_cas_op **op)
1283 {
1284  struct m0_cas_recv *reply_recv;
1285  int rc;
1286 
1287  reply_recv = &req->ccr_reply.cgr_rep;
1288  M0_ALLOC_ARR(reply_recv->cr_rec, max_replies_nr);
1289  if (reply_recv->cr_rec == NULL)
1290  return M0_ERR(-ENOMEM);
1291  /* Set to 0 initially, will be increased when reply is received. */
1292  reply_recv->cr_nr = 0;
1293  req->ccr_max_replies_nr = max_replies_nr;
1294  req->ccr_is_meta = true;
1295  rc = cas_index_op_prepare(req, cids, cids_nr, recv_val, flags, op);
1296  if (rc == 0) {
1297  req->ccr_rec_orig = (*op)->cg_rec;
1298  req->ccr_req_op = *op;
1300  }
1301  if (rc != 0) {
1302  m0_free(reply_recv->cr_rec);
1303  reply_recv->cr_nr = 0;
1304  reply_recv->cr_rec = NULL;
1305  }
1306 
1307  return M0_RC(rc);
1308 
1309 }
1310 
1311 M0_INTERNAL uint64_t m0_cas_req_nr(const struct m0_cas_req *req)
1312 {
1313  return req->ccr_reply.cgr_rep.cr_nr;
1314 }
1315 
1316 M0_INTERNAL int m0_cas_req_wait(struct m0_cas_req *req, uint64_t states,
1317  m0_time_t to)
1318 {
1319  M0_ENTRY();
1321  return M0_RC(m0_sm_timedwait(&req->ccr_sm, states, to));
1322 }
1323 
1324 M0_INTERNAL int m0_cas_index_create(struct m0_cas_req *req,
1325  const struct m0_cas_id *cids,
1326  uint64_t cids_nr,
1327  struct m0_dtx *dtx)
1328 {
1329  struct m0_cas_op *op;
1330  enum m0_cas_req_state next_state;
1331  int rc;
1332 
1333  M0_ENTRY();
1334  M0_PRE(req->ccr_sess != NULL);
1336  M0_PRE(m0_forall(i, cids_nr, m0_cas_id_invariant(&cids[i])));
1337  (void)dtx;
1338  rc = cas_index_req_prepare(req, cids, cids_nr, cids_nr, false, 0, &op);
1339  if (rc != 0)
1340  return M0_ERR(rc);
1341  rc = creq_fop_create_and_prepare(req, &cas_put_fopt, op, &next_state);
1342  if (rc == 0) {
1343  cas_fop_send(req);
1344  cas_req_state_set(req, next_state);
1345  }
1346  return M0_RC(rc);
1347 }
1348 
1349 static void cas_rep_copy(const struct m0_cas_req *req,
1350  uint64_t idx,
1351  struct m0_cas_rec_reply *rep)
1352 {
1353  const struct m0_cas_recv *recv = &req->ccr_reply.cgr_rep;
1354 
1355  M0_ASSERT(idx < m0_cas_req_nr(req));
1356  rep->crr_rc = recv->cr_rec[idx].cr_rc;
1357  rep->crr_hint = recv->cr_rec[idx].cr_hint;
1358 }
1359 
1360 M0_INTERNAL void m0_cas_index_create_rep(const struct m0_cas_req *req,
1361  uint64_t idx,
1362  struct m0_cas_rec_reply *rep)
1363 {
1364  M0_ENTRY();
1365  cas_rep_copy(req, idx, rep);
1366  M0_LEAVE();
1367 }
1368 
1369 M0_INTERNAL int m0_cas_index_delete(struct m0_cas_req *req,
1370  const struct m0_cas_id *cids,
1371  uint64_t cids_nr,
1372  struct m0_dtx *dtx,
1373  uint32_t flags)
1374 {
1375  struct m0_cas_op *op;
1376  enum m0_cas_req_state next_state;
1377  int rc;
1378 
1379  M0_ENTRY();
1380  M0_PRE(req->ccr_sess != NULL);
1382  M0_PRE(m0_forall(i, cids_nr, m0_cas_id_invariant(&cids[i])));
1384  (void)dtx;
1385  rc = cas_index_req_prepare(req, cids, cids_nr, cids_nr, false, flags,
1386  &op);
1387  if (rc != 0)
1388  return M0_ERR(rc);
1389  rc = creq_fop_create_and_prepare(req, &cas_del_fopt, op, &next_state);
1390  if (rc == 0) {
1391  cas_fop_send(req);
1393  }
1394  return M0_RC(rc);
1395 }
1396 
1397 M0_INTERNAL void m0_cas_index_delete_rep(const struct m0_cas_req *req,
1398  uint64_t idx,
1399  struct m0_cas_rec_reply *rep)
1400 {
1401  M0_ENTRY();
1402  cas_rep_copy(req, idx, rep);
1403  M0_LEAVE();
1404 }
1405 
1406 M0_INTERNAL int m0_cas_index_lookup(struct m0_cas_req *req,
1407  const struct m0_cas_id *cids,
1408  uint64_t cids_nr)
1409 {
1410  struct m0_cas_op *op;
1411  enum m0_cas_req_state next_state;
1412  int rc;
1413 
1414  M0_ENTRY();
1415  M0_PRE(req->ccr_sess != NULL);
1417  M0_PRE(m0_forall(i, cids_nr, m0_cas_id_invariant(&cids[i])));
1418  rc = cas_index_req_prepare(req, cids, cids_nr, cids_nr, true, 0, &op);
1419  if (rc != 0)
1420  return M0_ERR(rc);
1421  rc = creq_fop_create_and_prepare(req, &cas_get_fopt, op, &next_state);
1422  if (rc == 0) {
1423  cas_fop_send(req);
1424  cas_req_state_set(req, next_state);
1425  }
1426  return M0_RC(rc);
1427 }
1428 
1429 M0_INTERNAL void m0_cas_index_lookup_rep(const struct m0_cas_req *req,
1430  uint64_t idx,
1431  struct m0_cas_rec_reply *rep)
1432 {
1433  M0_ENTRY();
1434  cas_rep_copy(req, idx, rep);
1435  M0_LEAVE();
1436 }
1437 
1438 M0_INTERNAL int m0_cas_index_list(struct m0_cas_req *req,
1439  const struct m0_fid *start_fid,
1440  uint32_t indices_nr,
1441  uint32_t flags)
1442 {
1443  struct m0_cas_op *op;
1444  struct m0_cas_id cid = { .ci_fid = *start_fid };
1445  enum m0_cas_req_state next_state;
1446  int rc;
1447 
1448  M0_ENTRY();
1449  M0_PRE(start_fid != NULL);
1450  M0_PRE(req->ccr_sess != NULL);
1453 
1454  rc = cas_index_req_prepare(req, &cid, 1, indices_nr, false, flags, &op);
1455  if (rc != 0)
1456  return M0_ERR(rc);
1457  op->cg_rec.cr_rec[0].cr_rc = indices_nr;
1458  rc = creq_fop_create_and_prepare(req, &cas_cur_fopt, op, &next_state);
1459  if (rc == 0) {
1460  cas_fop_send(req);
1461  cas_req_state_set(req, next_state);
1462  }
1463  return M0_RC(rc);
1464 }
1465 
1466 static int cas_next_rc(int64_t service_rc)
1467 {
1468  int rc;
1469 
1470  /*
1471  * Zero return code means some error on service side.
1472  * Service place sequence number of record starting from 1 in cr_rc on
1473  * success.
1474  */
1475  if (service_rc == 0)
1476  /*
1477  * Don't use M0_ERR() here to not pollute trace log.
1478  * Service places zero return code in all records following the
1479  * record having negative return code. It can happen in a
1480  * totally valid case when client requests more records than
1481  * available in a catalogue.
1482  */
1483  rc = -EPROTO;
1484  else if (service_rc < 0)
1485  rc = service_rc;
1486  else
1487  rc = 0;
1488  return M0_RC(rc);
1489 }
1490 
1491 M0_INTERNAL void m0_cas_index_list_rep(struct m0_cas_req *req,
1492  uint32_t idx,
1493  struct m0_cas_ilist_reply *rep)
1494 {
1495  struct m0_cas_recv *recv = &req->ccr_reply.cgr_rep;
1496  struct m0_cas_rec *rec;
1497  struct m0_buf fid;
1498 
1499  M0_ENTRY();
1500  M0_PRE(idx < m0_cas_req_nr(req));
1501 
1502  rec = &recv->cr_rec[idx];
1503  rep->clr_rc = cas_next_rc(rec->cr_rc) ?:
1504  m0_rpc_at_rep_get(NULL, &rec->cr_key, &fid);
1505  if (rep->clr_rc == 0) {
1506  rep->clr_fid = *(struct m0_fid *)fid.b_addr;
1507  rep->clr_hint = recv->cr_rec[idx].cr_hint;
1508  }
1509  M0_LEAVE();
1510 }
1511 
1513 {
1514  struct m0_cas_op *op = m0_fop_data(req->ccr_fop);
1515  int rc = 0;
1516 
1517  if (M0_FI_ENABLED("fragm_error"))
1518  return -E2BIG;
1519 
1520  M0_PRE(req->ccr_rec_orig.cr_nr != 0 &&
1521  req->ccr_rec_orig.cr_rec != NULL);
1522  M0_PRE(req->ccr_sent_recs_nr < req->ccr_rec_orig.cr_nr);
1523  /*
1524  * Flush records counter and reset records pointer to the first record
1525  * to be sent.
1526  */
1527  op->cg_rec.cr_nr = 0;
1528  op->cg_rec.cr_rec = &req->ccr_rec_orig.cr_rec[req->ccr_sent_recs_nr];
1529  do {
1530  op->cg_rec.cr_nr++;
1531  if (m0_rpc_item_max_payload_exceeded(&req->ccr_fop->f_item,
1532  req->ccr_sess)) {
1533  /*
1534  * Found the number of records when item payload exceeds
1535  * max rpc item payload, chose previous number of
1536  * records (current - 1) for sending.
1537  */
1538  op->cg_rec.cr_nr--;
1539  break;
1540  }
1541  } while (req->ccr_sent_recs_nr + op->cg_rec.cr_nr <
1542  req->ccr_rec_orig.cr_nr);
1543  if (op->cg_rec.cr_nr == 0)
1544  rc = -E2BIG; /* Almost impossible case. */
1545  if (rc != 0)
1546  /*
1547  * Restore original records vector in case of error for proper
1548  * finalisation of data structures.
1549  */
1550  op->cg_rec = req->ccr_rec_orig;
1551 
1552  return M0_RC(rc);
1553 }
1554 
1556  struct m0_cas_op *op)
1557 {
1558  int rc;
1559 
1560  rc = creq_fop_create(req, req->ccr_ftype, op);
1561  if (rc != 0)
1562  return M0_ERR(rc);
1564  if (rc == 0)
1565  cas_fop_send(req);
1566  else
1568 
1569  return M0_RC(rc);
1570 }
1571 
1572 static int cas_records_op_prepare(const struct m0_cas_req *req,
1573  const struct m0_cas_id *index,
1574  const struct m0_bufvec *keys,
1575  const struct m0_bufvec *values,
1576  uint32_t flags,
1577  struct m0_cas_op **out)
1578 {
1579  struct m0_cas_op *op;
1580  struct m0_cas_rec *rec;
1581  uint32_t keys_nr = keys->ov_vec.v_nr;
1582  uint64_t i;
1583  int rc;
1584 
1585  M0_ENTRY();
1586  rc = creq_op_alloc(keys_nr, &op);
1587  if (rc != 0)
1588  return M0_ERR(rc);
1589  op->cg_id = *index;
1590  if (m0_fid_type_getfid(&op->cg_id.ci_fid) == &m0_cctg_fid_type) {
1591  M0_ASSERT(index->ci_layout.dl_type == DIX_LTYPE_DESCR);
1592  rc = m0_dix_ldesc_copy(&op->cg_id.ci_layout.u.dl_desc,
1593  &index->ci_layout.u.dl_desc);
1594  }
1595 
1596  rec = op->cg_rec.cr_rec;
1597  for (i = 0; i < keys_nr; i++) {
1598  m0_rpc_at_init(&rec[i].cr_key);
1599  rc = creq_kv_buf_add(req, keys, i, &rec[i].cr_key);
1600  if (rc == 0 && values != NULL) {
1601  m0_rpc_at_init(&rec[i].cr_val);
1602  rc = creq_kv_buf_add(req, values, i, &rec[i].cr_val);
1603  }
1604  if (rc != 0)
1605  break;
1606  }
1607  if (rc != 0) {
1608  op->cg_rec.cr_nr = i + 1;
1609  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
1610  creq_op_free(op);
1611  return M0_ERR(rc);
1612  }
1613  op->cg_flags = flags;
1614  *out = op;
1615  return M0_RC(rc);
1616 }
1617 
1618 static int cas_req_prep(struct m0_cas_req *req,
1619  const struct m0_cas_id *index,
1620  const struct m0_bufvec *keys,
1621  const struct m0_bufvec *values,
1622  uint64_t max_replies_nr,
1623  uint32_t flags,
1624  struct m0_cas_op **op)
1625 {
1626  struct m0_cas_recv *reply_recv;
1627  int rc;
1628  M0_ENTRY();
1629 
1630  reply_recv = &req->ccr_reply.cgr_rep;
1631  M0_ALLOC_ARR(reply_recv->cr_rec, max_replies_nr);
1632  if (reply_recv->cr_rec == NULL)
1633  return M0_ERR(-ENOMEM);
1634  /* Set to 0 initially, will be increased when reply is received. */
1635  reply_recv->cr_nr = 0;
1636  req->ccr_max_replies_nr = max_replies_nr;
1637  req->ccr_is_meta = false;
1638  rc = cas_records_op_prepare(req, index, keys, values, flags, op);
1639  if (rc == 0) {
1640  req->ccr_rec_orig = (*op)->cg_rec;
1641  req->ccr_req_op = *op;
1643  }
1644  if (rc != 0) {
1645  m0_free(reply_recv->cr_rec);
1646  reply_recv->cr_nr = 0;
1647  reply_recv->cr_rec = NULL;
1648  }
1649  return M0_RC(rc);
1650 }
1651 
1652 M0_INTERNAL int m0_cas_put(struct m0_cas_req *req,
1653  struct m0_cas_id *index,
1654  const struct m0_bufvec *keys,
1655  const struct m0_bufvec *values,
1656  struct m0_dtx *dtx,
1657  uint32_t flags)
1658 {
1659  struct m0_cas_op *op;
1660  enum m0_cas_req_state next_state;
1661  int rc;
1662 
1663  M0_ENTRY();
1664  M0_PRE(keys != NULL);
1665  M0_PRE(values != NULL);
1666  M0_PRE(keys->ov_vec.v_nr == values->ov_vec.v_nr);
1668  /* Create and overwrite flags can't be specified together. */
1669  M0_PRE(!(flags & COF_CREATE) || !(flags & COF_OVERWRITE));
1670  /*
1671  * Only create, overwrite, crow, sync_wait, and skip_layout flags
1672  * are allowed.
1673  */
1674  M0_PRE((flags &
1678 
1679  rc = cas_req_prep(req, index, keys, values, keys->ov_vec.v_nr, flags,
1680  &op);
1681  if (rc != 0)
1682  return M0_ERR(rc);
1683  rc = m0_dtx0_txd_copy(dtx, &op->cg_txd);
1684  if (rc != 0)
1685  return M0_ERR(rc);
1686  rc = creq_fop_create_and_prepare(req, &cas_put_fopt, op, &next_state);
1687  if (rc == 0) {
1688  cas_fop_send(req);
1689  cas_req_state_set(req, next_state);
1690  }
1691  return M0_RC(rc);
1692 }
1693 
1694 M0_INTERNAL void m0_cas_put_rep(struct m0_cas_req *req,
1695  uint64_t idx,
1696  struct m0_cas_rec_reply *rep)
1697 {
1698  M0_ENTRY();
1699  M0_PRE(req->ccr_ftype == &cas_put_fopt);
1700  cas_rep_copy(req, idx, rep);
1701  M0_LEAVE();
1702 }
1703 
1704 static int m0_cas__get(struct m0_cas_req *req,
1705  struct m0_cas_id *index,
1706  const struct m0_bufvec *keys,
1707  int flags)
1708 {
1709  struct m0_cas_op *op;
1710  int rc;
1711  struct m0_rpc_at_buf *ab;
1712  enum m0_cas_req_state next_state;
1713  uint32_t i;
1714 
1715  M0_ENTRY();
1716  M0_PRE(keys != NULL);
1719 
1720  rc = cas_req_prep(req, index, keys, NULL, keys->ov_vec.v_nr, flags,
1721  &op);
1722  if (rc != 0)
1723  return M0_ERR(rc);
1724  for (i = 0; i < keys->ov_vec.v_nr; i++) {
1725  ab = &op->cg_rec.cr_rec[i].cr_val;
1726  m0_rpc_at_init(ab);
1728  M0_RPC_AT_UNKNOWN_LEN, false);
1729  if (rc != 0) {
1730  m0_rpc_at_fini(ab);
1731  break;
1732  }
1733  }
1734  if (rc != 0) {
1735  op->cg_rec.cr_nr = i;
1736  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
1737  creq_op_free(op);
1738  } else {
1739  req->ccr_keys = keys;
1741  &next_state);
1742  if (rc == 0) {
1743  cas_fop_send(req);
1744  cas_req_state_set(req, next_state);
1745  }
1746  }
1747  return M0_RC(rc);
1748 }
1749 
1750 M0_INTERNAL int m0_cas_get(struct m0_cas_req *req,
1751  struct m0_cas_id *index,
1752  const struct m0_bufvec *keys)
1753 {
1754  return m0_cas__get(req, index, keys, 0);
1755 }
1756 
1757 M0_INTERNAL int m0_cas_versioned_get(struct m0_cas_req *req,
1758  struct m0_cas_id *index,
1759  const struct m0_bufvec *keys)
1760 {
1761  return m0_cas__get(req, index, keys, COF_VERSIONED);
1762 }
1763 
1764 M0_INTERNAL void m0_cas_get_rep(const struct m0_cas_req *req,
1765  uint64_t idx,
1766  struct m0_cas_get_reply *rep)
1767 {
1768  const struct m0_cas_rep *cas_rep = &req->ccr_reply;
1769  struct m0_cas_rec *sent;
1770  struct m0_cas_rec *rcvd;
1771 
1772  M0_ENTRY();
1773  M0_PRE(idx < m0_cas_req_nr(req));
1774  M0_PRE(req->ccr_ftype == &cas_get_fopt);
1775  rcvd = &cas_rep->cgr_rep.cr_rec[idx];
1776  sent = &req->ccr_rec_orig.cr_rec[idx];
1777  rep->cge_rc = rcvd->cr_rc;
1778  rep->cge_ver = rcvd->cr_ver;
1779  if (rep->cge_rc == 0)
1780  m0_rpc_at_rep_get(&sent->cr_val, &rcvd->cr_val, &rep->cge_val);
1781  M0_LEAVE();
1782 }
1783 
1784 M0_INTERNAL int m0_cas_next(struct m0_cas_req *req,
1785  struct m0_cas_id *index,
1786  struct m0_bufvec *start_keys,
1787  uint32_t *recs_nr,
1788  uint32_t flags)
1789 {
1790  struct m0_cas_op *op;
1791  enum m0_cas_req_state next_state;
1792  uint64_t max_replies_nr = 0;
1793  uint32_t i;
1794  int rc;
1795 
1796  M0_ENTRY();
1797  M0_PRE(start_keys != NULL);
1800  /* Only slant, exclude start key, and versioned flags are allowed. */
1802  COF_VERSIONED | COF_SHOW_DEAD)) == 0);
1803  /* COF_SHOW_DEAD cannot be used without COF_VERSIONED */
1804  M0_PRE(ergo((flags & COF_SHOW_DEAD) != 0,
1805  (flags & COF_VERSIONED) != 0));
1806 
1807  for (i = 0; i < start_keys->ov_vec.v_nr; i++)
1808  max_replies_nr += recs_nr[i];
1809  rc = cas_req_prep(req, index, start_keys, NULL, max_replies_nr, flags,
1810  &op);
1811  if (rc != 0)
1812  return M0_ERR(rc);
1813  for (i = 0; i < start_keys->ov_vec.v_nr; i++)
1814  op->cg_rec.cr_rec[i].cr_rc = recs_nr[i];
1815  req->ccr_keys = start_keys;
1817  &next_state);
1818  if (rc == 0) {
1819  cas_fop_send(req);
1820  cas_req_state_set(req, next_state);
1821  }
1822  return M0_RC(rc);
1823 }
1824 
1825 M0_INTERNAL void m0_cas_rep_mlock(const struct m0_cas_req *req,
1826  uint64_t idx)
1827 {
1828  const struct m0_cas_rep *cas_rep = &req->ccr_reply;
1829 
1830  M0_PRE(M0_IN(req->ccr_ftype, (&cas_get_fopt, &cas_cur_fopt)));
1832 }
1833 
1834 M0_INTERNAL void m0_cas_next_rep(const struct m0_cas_req *req,
1835  uint32_t idx,
1836  struct m0_cas_next_reply *rep)
1837 {
1838  const struct m0_cas_rep *cas_rep = &req->ccr_reply;
1839  struct m0_cas_rec *rcvd;
1840 
1841  M0_ENTRY();
1842  M0_PRE(idx < m0_cas_req_nr(req));
1843  M0_PRE(req->ccr_ftype == &cas_cur_fopt);
1844  rcvd = &cas_rep->cgr_rep.cr_rec[idx];
1845  rep->cnp_ver = rcvd->cr_ver;
1846  rep->cnp_rc = cas_next_rc(rcvd->cr_rc) ?:
1847  m0_rpc_at_rep_get(NULL, &rcvd->cr_key, &rep->cnp_key) ?:
1848  m0_rpc_at_rep_get(NULL, &rcvd->cr_val, &rep->cnp_val);
1849 }
1850 
1851 M0_INTERNAL int m0_cas_del(struct m0_cas_req *req,
1852  struct m0_cas_id *index,
1853  struct m0_bufvec *keys,
1854  struct m0_dtx *dtx,
1855  uint32_t flags)
1856 {
1857  struct m0_cas_op *op;
1858  enum m0_cas_req_state next_state;
1859  int rc;
1860 
1861  M0_ENTRY();
1862  M0_PRE(keys != NULL);
1866 
1867  rc = cas_req_prep(req, index, keys, NULL, keys->ov_vec.v_nr, flags,
1868  &op);
1869  if (rc != 0)
1870  return M0_ERR(rc);
1871  rc = m0_dtx0_txd_copy(dtx, &op->cg_txd);
1872  if (rc != 0)
1873  return M0_ERR(rc);
1874  rc = creq_fop_create_and_prepare(req, &cas_del_fopt, op, &next_state);
1875  if (rc == 0) {
1876  cas_fop_send(req);
1877  cas_req_state_set(req, next_state);
1878  }
1879  return M0_RC(rc);
1880 }
1881 
1882 M0_INTERNAL void m0_cas_del_rep(struct m0_cas_req *req,
1883  uint64_t idx,
1884  struct m0_cas_rec_reply *rep)
1885 {
1886  M0_ENTRY();
1887  M0_PRE(req->ccr_ftype == &cas_del_fopt);
1888  cas_rep_copy(req, idx, rep);
1889  M0_LEAVE();
1890 }
1891 
1892 M0_INTERNAL int m0_cas_sm_conf_init(void)
1893 {
1898 }
1899 
1900 M0_INTERNAL void m0_cas_sm_conf_fini(void)
1901 {
1904 }
1905 
1906 #undef M0_TRACE_SUBSYSTEM
1907 
1910 /*
1911  * Local variables:
1912  * c-indentation-style: "K&R"
1913  * c-basic-offset: 8
1914  * tab-width: 8
1915  * fill-column: 80
1916  * scroll-step: 1
1917  * End:
1918  */
1919 /*
1920  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
1921  */
M0_INTERNAL void m0_cas_req_fini(struct m0_cas_req *req)
Definition: client.c:291
void * fd_data
Definition: fop.h:76
static void creq_kv_hold_down(struct m0_cas_rec *rec)
Definition: client.c:313
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
struct m0_rpc_at_buf cr_val
Definition: cas.h:301
#define M0_PRE(cond)
static int cas_req_prep(struct m0_cas_req *req, const struct m0_cas_id *index, const struct m0_bufvec *keys, const struct m0_bufvec *values, uint64_t max_replies_nr, uint32_t flags, struct m0_cas_op **op)
Definition: client.c:1618
M0_INTERNAL void m0_sm_conf_init(struct m0_sm_conf *conf)
Definition: sm.c:340
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
Definition: dtm.h:554
struct m0_cas_rec * cni_rep
Definition: client.c:64
M0_INTERNAL void m0_sm_fail(struct m0_sm *mach, int fail_state, int32_t rc)
Definition: sm.c:468
static int greq_asmbl_fill(struct m0_cas_req *req, struct m0_cas_op *op)
Definition: client.c:676
static struct m0_cas_req * item_to_cas_req(struct m0_rpc_item *item)
Definition: client.c:444
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
static void cas_to_rpc_map(const struct m0_cas_req *creq, const struct m0_rpc_item *item)
Definition: client.c:148
struct m0_rpc_at_buf ck_key
Definition: cas.h:127
static int cas_req_fragmentation(struct m0_cas_req *req)
Definition: client.c:1512
int const char const void size_t int flags
Definition: dir.c:328
static void creq_fop_destroy(struct m0_cas_req *req)
Definition: client.c:361
M0_INTERNAL int m0_sm_addb2_init(struct m0_sm_conf *conf, uint64_t id, uint64_t counter)
Definition: sm.c:846
static int creq_fop_create(struct m0_cas_req *req, struct m0_fop_type *ftype, struct m0_cas_op *op)
Definition: client.c:397
#define NULL
Definition: misc.h:38
struct m0_cas_recv * cni_repv
Definition: client.c:68
static const struct m0_rpc_item_ops cas_item_ops
Definition: client.c:78
Definition: idx_mock.c:52
M0_INTERNAL int m0_cas_index_list(struct m0_cas_req *req, const struct m0_fid *start_fid, uint32_t indices_nr, uint32_t flags)
Definition: client.c:1438
#define ergo(a, b)
Definition: misc.h:293
Definition: sm.h:350
static int cas_records_op_prepare(const struct m0_cas_req *req, const struct m0_cas_id *index, const struct m0_bufvec *keys, const struct m0_bufvec *values, uint32_t flags, struct m0_cas_op **out)
Definition: client.c:1572
static struct io_request req
Definition: file.c:100
uint64_t cv_nr
Definition: cas.h:135
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:78
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
static struct m0_sm_trans_descr cas_req_trans[]
Definition: client.c:121
M0_LEAVE()
M0_INTERNAL void m0_rpc_at_init(struct m0_rpc_at_buf *ab)
Definition: at.c:433
static void creq_niter_init(struct creq_niter *it, struct m0_cas_op *op, struct m0_cas_rep *rep)
Definition: client.c:760
static void cas_req_state_set(struct m0_cas_req *req, enum m0_cas_req_state state)
Definition: client.c:243
M0_INTERNAL bool m0_cas_req_is_locked(const struct m0_cas_req *req)
Definition: client.c:238
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL bool m0_rpc_at_is_set(const struct m0_rpc_at_buf *ab)
Definition: at.c:492
static void creq_asmbl_fop_release(struct m0_ref *ref)
Definition: client.c:383
Definition: cas.h:247
struct m0_vec ov_vec
Definition: vec.h:147
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
M0_INTERNAL int m0_dtx0_txd_copy(const struct m0_dtx *dtx, struct m0_dtm0_tx_desc *dst)
Definition: dtx.c:544
M0_INTERNAL void m0_cas_id_fini(struct m0_cas_id *cid)
Definition: cas.c:199
M0_INTERNAL void m0_cas_get_rep(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_get_reply *rep)
Definition: client.c:1764
struct m0_sm ri_sm
Definition: item.h:181
M0_INTERNAL const struct m0_fid m0_cas_meta_fid
Definition: cas.c:147
static bool nreq_asmbl_post(struct m0_cas_req *req)
Definition: client.c:895
static int sum
Definition: rwlock.c:53
int32_t ri_error
Definition: item.h:161
static void cas_rep_copy(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1349
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:219
static struct m0_be_emap_cursor it
Definition: extmap.c:46
M0_INTERNAL const char * m0_sm_state_name(const struct m0_sm *mach, int state)
Definition: sm.c:781
struct m0_cas_recv cgr_rep
Definition: cas.h:431
M0_INTERNAL void m0_cas_index_create_rep(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1360
M0_INTERNAL int m0_rpc_at_add(struct m0_rpc_at_buf *ab, const struct m0_buf *buf, const struct m0_rpc_conn *conn)
Definition: at.c:462
#define M0_BITS(...)
Definition: misc.h:236
Definition: sm.h:504
static void cas_fop_send(struct m0_cas_req *req)
Definition: client.c:579
struct m0_fop ccr_asmbl_fop
Definition: client.h:137
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
#define CASREQ_FOP_DATA(fop)
Definition: client.c:74
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
M0_INTERNAL bool m0_sm_addb2_counter_init(struct m0_sm *sm)
Definition: sm.c:891
static struct m0_rpc_item * item
Definition: item.c:56
struct m0_fop_getxattr_rep * rep
Definition: dir.c:455
void ** ov_buf
Definition: vec.h:149
M0_INTERNAL int m0_cas_get(struct m0_cas_req *req, struct m0_cas_id *index, const struct m0_bufvec *keys)
Definition: client.c:1750
Definition: sock.c:887
struct m0_rpc_at_buf cr_val
Definition: cas.h:182
M0_INTERNAL void m0_dtm0_tx_desc_init_none(struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:39
static void cas_req_failure_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: client.c:549
static void creq_rep_override(struct m0_cas_rec *orig, struct m0_cas_rec *new)
Definition: client.c:947
M0_INTERNAL int m0_rpc_at_rep_get(struct m0_rpc_at_buf *sent, struct m0_rpc_at_buf *rcvd, struct m0_buf *out)
Definition: at.c:606
static void creq_asmbl_fop_init(struct m0_cas_req *req, struct m0_fop_type *ftype, struct m0_cas_op *op)
Definition: client.c:610
M0_INTERNAL void m0_cas_req_unlock(struct m0_cas_req *req)
Definition: client.c:232
M0_INTERNAL bool m0_cas_id_invariant(const struct m0_cas_id *cid)
Definition: cas.c:208
M0_INTERNAL int m0_dix_ldesc_copy(struct m0_dix_ldesc *dst, const struct m0_dix_ldesc *src)
Definition: layout.c:189
uint64_t cni_req_i
Definition: client.c:69
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
Definition: sm.c:387
struct m0_fid fid
Definition: di.c:46
M0_INTERNAL void m0_cas_req_lock(struct m0_cas_req *req)
Definition: client.c:226
static struct m0_sm_group * cas_req_smgrp(const struct m0_cas_req *req)
Definition: client.c:221
return M0_RC(rc)
op
Definition: libdemo.c:64
struct m0_sm ccr_sm
Definition: client.h:125
#define M0_ENTRY(...)
Definition: trace.h:170
static int cas_req_fragment_continue(struct m0_cas_req *req, struct m0_cas_op *op)
Definition: client.c:1555
Definition: buf.h:37
struct m0_cas_rec * cr_rec
Definition: cas.h:236
static void nreq_asmbl_accept(struct m0_cas_req *req)
Definition: client.c:963
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
static void cas_req_replied_cb(struct m0_rpc_item *item)
Definition: client.c:1181
struct m0_rpc_at_buf ck_val
Definition: cas.h:128
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
struct m0_rpc_at_buf cr_key
Definition: cas.h:172
static int cas_index_op_prepare(const struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr, bool recv_val, uint32_t flags, struct m0_cas_op **out)
Definition: client.c:1203
int i
Definition: dir.c:1033
struct m0_fop_type * f_type
Definition: fop.h:82
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
uint64_t cr_nr
Definition: cas.h:235
m0_cas_req_state
Definition: client.h:109
return M0_ERR(-EOPNOTSUPP)
void * sa_datum
Definition: sm.h:508
M0_INTERNAL int m0_cas_req_generic_rc(const struct m0_cas_req *req)
Definition: client.c:460
static const struct m0_rpc_item_ops asmbl_item_ops
Definition: client.c:85
Definition: cas.h:126
M0_INTERNAL bool m0_rpc_at_rep_is_bulk(const struct m0_rpc_at_buf *rcvd, uint64_t *len)
Definition: at.c:651
M0_INTERNAL void m0_cas_req_init(struct m0_cas_req *req, struct m0_rpc_session *sess, struct m0_sm_group *grp)
Definition: client.c:198
static int key
Definition: locality.c:283
static void cas_req_replied_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: client.c:1140
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
Definition: refs.h:34
if(value==NULL)
Definition: dir.c:350
static void cas_req_failure_ast_post(struct m0_cas_req *req, int32_t rc)
Definition: client.c:559
M0_INTERNAL void m0_cas_put_rep(struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1694
Definition: cas.h:264
M0_INTERNAL int m0_cas_next(struct m0_cas_req *req, struct m0_cas_id *index, struct m0_bufvec *start_keys, uint32_t *recs_nr, uint32_t flags)
Definition: client.c:1784
M0_INTERNAL int m0_cas_versioned_get(struct m0_cas_req *req, struct m0_cas_id *index, const struct m0_bufvec *keys)
Definition: client.c:1757
static struct m0_cas_rep * cas_rep(struct m0_rpc_item *reply)
Definition: client.c:455
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL void m0_cas_next_rep(const struct m0_cas_req *req, uint32_t idx, struct m0_cas_next_reply *rep)
Definition: client.c:1834
struct m0_xcode_type * m0_cas_id_xc
Definition: cas_xc.c:11
static int nreq_asmbl_fill(struct m0_cas_req *req, struct m0_cas_op *op)
Definition: client.c:841
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
uint64_t cr_nr
Definition: cas.h:284
static struct m0_rpc_machine * creq_rpc_mach(const struct m0_cas_req *req)
Definition: client.c:216
M0_INTERNAL int m0_cas_sm_conf_init(void)
Definition: client.c:1892
struct m0_crv cr_ver
Definition: cas.h:228
M0_INTERNAL bool m0_rpc_item_max_payload_exceeded(struct m0_rpc_item *item, struct m0_rpc_session *session)
Definition: item.c:490
static int greq_asmbl_add(struct m0_cas_req *req, struct m0_cas_rec *rec, uint64_t idx, uint64_t orig_idx, uint64_t vlen)
Definition: client.c:626
uint64_t cni_kpos
Definition: client.c:71
M0_INTERNAL int m0_xcode_obj_enc_to_buf(struct m0_xcode_obj *obj, void **buf, m0_bcount_t *len)
Definition: xcode.c:832
M0_INTERNAL struct m0_fop_type cas_get_fopt
Definition: cas.c:48
static struct m0_sm_state_descr cas_req_states[]
Definition: client.c:90
M0_INTERNAL int m0_cas_del(struct m0_cas_req *req, struct m0_cas_id *index, struct m0_bufvec *keys, struct m0_dtx *dtx, uint32_t flags)
Definition: client.c:1851
M0_INTERNAL const struct m0_fid_type m0_cctg_fid_type
Definition: cas.c:164
static int nreq_asmbl_prep(struct m0_cas_req *req, struct m0_cas_op *op)
Definition: client.c:808
struct m0_cas_kv * cv_rec
Definition: cas.h:136
struct m0_rpc_item * ri_reply
Definition: item.h:163
M0_INTERNAL const struct m0_fid_type * m0_fid_type_getfid(const struct m0_fid *fid)
Definition: fid.c:76
struct m0_cas_rec * cni_req
Definition: client.c:63
void * f_opaque
Definition: fop.h:85
#define M0_POST(cond)
M0_INTERNAL struct m0_fop_type cas_cur_fopt
Definition: cas.c:51
M0_INTERNAL void m0_sm_addb2_fini(struct m0_sm_conf *conf)
Definition: sm.c:870
static int cas_rep_validate(const struct m0_cas_req *req)
Definition: client.c:534
struct m0_sm_conf cas_req_sm_conf
Definition: client.c:135
uint32_t v_nr
Definition: vec.h:51
static void creq_recv_fini(struct m0_cas_recv *recv, bool op_is_meta)
Definition: client.c:332
struct m0_fid ci_fid
Definition: cas.h:113
M0_INTERNAL int m0_cas_index_create(struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr, struct m0_dtx *dtx)
Definition: client.c:1324
M0_INTERNAL void m0_cas_index_list_rep(struct m0_cas_req *req, uint32_t idx, struct m0_cas_ilist_reply *rep)
Definition: client.c:1491
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
m0_bcount_t * v_count
Definition: vec.h:53
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:135
struct m0_mutex s_lock
Definition: sm.h:514
void(* rio_sent)(struct m0_rpc_item *item)
Definition: item.h:267
struct m0_cas_kv_vec cr_kv_bufs
Definition: cas.h:195
struct m0_sm_ast ccr_failure_ast
Definition: client.h:157
struct m0_cas_recv cg_rec
Definition: cas.h:388
struct m0_cas_hint cr_hint
Definition: cas.h:200
Definition: cas.h:376
static int creq_fop_create_and_prepare(struct m0_cas_req *req, struct m0_fop_type *ftype, struct m0_cas_op *op, enum m0_cas_req_state *next_state)
Definition: client.c:417
M0_INTERNAL int m0_cas_index_delete(struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr, struct m0_dtx *dtx, uint32_t flags)
Definition: client.c:1369
static uint64_t greq_asmbl_count(const struct m0_cas_req *req)
Definition: client.c:651
static void creq_op_free(struct m0_cas_op *op)
Definition: client.c:188
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
#define m0_forall(var, nr,...)
Definition: misc.h:112
uint32_t sd_flags
Definition: sm.h:378
M0_INTERNAL struct m0_fop_type cas_del_fopt
Definition: cas.c:50
uint64_t cni_rep_i
Definition: client.c:70
static int creq_niter_next(struct creq_niter *it)
Definition: client.c:773
M0_INTERNAL void m0_cas_sm_conf_fini(void)
Definition: client.c:1900
static struct m0_rpc_conn * creq_rpc_conn(const struct m0_cas_req *req)
Definition: client.c:211
struct m0_fop_data f_data
Definition: fop.h:83
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
Definition: item.c:973
M0_INTERNAL void m0_cas_index_delete_rep(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1397
M0_INTERNAL void m0_cas_req_fini_lock(struct m0_cas_req *req)
Definition: client.c:298
M0_INTERNAL void m0_cas_del_rep(struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1882
M0_INTERNAL uint64_t m0_cas_req_nr(const struct m0_cas_req *req)
Definition: client.c:1311
void m0_rpc_item_put_lock(struct m0_rpc_item *item)
Definition: item.c:454
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
struct m0_ref f_ref
Definition: fop.h:81
Definition: fid.h:38
static void creq_asmbl_replied_cb(struct m0_rpc_item *item)
Definition: client.c:1128
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
static bool greq_asmbl_post(struct m0_cas_req *req)
Definition: client.c:718
#define M0_IS0(obj)
Definition: misc.h:70
M0_INTERNAL int m0_cas_index_lookup(struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr)
Definition: client.c:1406
static int creq_op_alloc(uint64_t recs_nr, struct m0_cas_op **out)
Definition: client.c:163
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
static int cas_next_rc(int64_t service_rc)
Definition: client.c:1466
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
static void creq_item_prepare(const struct m0_cas_req *req, struct m0_rpc_item *item, const struct m0_rpc_item_ops *ops)
Definition: client.c:568
static void addb2_add_cas_req_attrs(const struct m0_cas_req *req)
Definition: client.c:1265
static void creq_niter_fini(struct creq_niter *it)
Definition: client.c:799
M0_INTERNAL int m0_cas_put(struct m0_cas_req *req, struct m0_cas_id *index, const struct m0_bufvec *keys, const struct m0_bufvec *values, struct m0_dtx *dtx, uint32_t flags)
Definition: client.c:1652
struct m0_sm_ast ccr_replied_ast
Definition: client.h:155
struct m0_rpc_session * ri_session
Definition: item.h:147
static struct m0_rpc_item * cas_req_to_item(const struct m0_cas_req *req)
Definition: client.c:450
static int m0_cas__get(struct m0_cas_req *req, struct m0_cas_id *index, const struct m0_bufvec *keys, int flags)
Definition: client.c:1704
M0_INTERNAL int m0_cas_req_wait(struct m0_cas_req *req, uint64_t states, m0_time_t to)
Definition: client.c:1316
int32_t cgr_rc
Definition: cas.h:420
M0_INTERNAL void m0_rpc_at_fini(struct m0_rpc_at_buf *ab)
Definition: at.c:441
struct m0_rpc_at_buf cr_key
Definition: cas.h:291
static bool creq_niter_invariant(struct creq_niter *it)
Definition: client.c:754
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:198
static int creq_kv_buf_add(const struct m0_cas_req *req, const struct m0_bufvec *kv, uint32_t idx, struct m0_rpc_at_buf *buf)
Definition: client.c:595
static struct m0_fop * fop
Definition: item.c:57
static void greq_asmbl_accept(struct m0_cas_req *req)
Definition: client.c:998
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:345
static void cas_req_reply_fini(struct m0_cas_req *req)
Definition: client.c:252
#define M0_XCODE_OBJ(type, ptr)
Definition: xcode.h:962
M0_INTERNAL struct m0_fop_type cas_put_fopt
Definition: cas.c:49
M0_INTERNAL void m0_cas_rep_mlock(const struct m0_cas_req *req, uint64_t idx)
Definition: client.c:1825
static void cas_req_fini(struct m0_cas_req *req)
Definition: client.c:264
static int cas_req_reply_handle(struct m0_cas_req *req, bool *fragm_continue)
Definition: client.c:1042
#define out(...)
Definition: gen.c:41
static void cas_req_fsync_remid_copy(struct m0_cas_req *req)
Definition: client.c:1029
M0_INTERNAL void m0_sm_conf_fini(struct m0_sm_conf *conf)
Definition: sm.c:376
static int cas_rep__validate(const struct m0_fop_type *ftype, struct m0_cas_op *op, struct m0_cas_rep *rep)
Definition: client.c:485
static void creq_asmbl_replied_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: client.c:1092
struct m0_cas_recv * cni_reqv
Definition: client.c:67
struct m0_fom_ops ops
Definition: io_foms.c:623
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
static void cas_req_failure(struct m0_cas_req *req, int32_t rc)
Definition: client.c:543
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
Definition: sm.c:1021
M0_INTERNAL int m0_rpc_at_rep2inline(struct m0_rpc_at_buf *sent, struct m0_rpc_at_buf *rcvd)
Definition: at.c:663
void m0_free(void *data)
Definition: memory.c:146
uint64_t cr_rc
Definition: cas.h:221
struct m0_rpc_item f_item
Definition: fop.h:84
Definition: cas.h:107
#define M0_BUF_INIT(size, data)
Definition: buf.h:64
M0_INTERNAL void m0_dtm0_tx_desc_fini(struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:110
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL void m0_cas_index_lookup_rep(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1429
static struct m0_sm_state_descr states[C_NR]
Definition: sm.c:512
static int cas_index_req_prepare(struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr, uint64_t max_replies_nr, bool recv_val, uint32_t flags, struct m0_cas_op **op)
Definition: client.c:1276
const m0_time_t M0_TIME_IMMEDIATELY
Definition: time.c:107
Definition: fop.h:80
static void creq_fop_release(struct m0_ref *ref)
Definition: client.c:371
struct m0_cas_id cg_id
Definition: cas.h:378
static bool fid_is_meta(struct m0_fid *fid)
Definition: client.c:157
Definition: vec.h:145
M0_INTERNAL void m0_rpc_at_detach(struct m0_rpc_at_buf *ab)
Definition: at.c:694
M0_INTERNAL int m0_rpc_at_recv(struct m0_rpc_at_buf *ab, const struct m0_rpc_conn *conn, uint32_t len, bool force_bulk)
Definition: at.c:508
static bool cas_rep_val_is_valid(struct m0_rpc_at_buf *val, struct m0_fid *idx_fid)
Definition: client.c:479
m0_time_t ri_deadline
Definition: item.h:141
Definition: idx_mock.c:47
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331