Motr  M0
proxy.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CM
24 
25 #include "lib/memory.h"
26 #include "lib/errno.h"
27 #include "lib/trace.h"
28 #include "lib/time.h"
29 #include "lib/misc.h"
30 #include "lib/locality.h"
31 
32 #include "rpc/rpc.h"
33 #include "rpc/session.h"
34 #include "motr/magic.h"
35 #include "motr/setup.h" /* CS_MAX_EP_ADDR_LEN */
36 #include "fop/fom.h"
37 
38 #include "cm/cm.h"
39 #include "cm/cp.h"
40 #include "cm/proxy.h"
41 #include "cm/ag.h"
42 
49 enum {
51 };
52 
53 M0_TL_DESCR_DEFINE(proxy, "copy machine proxy", M0_INTERNAL,
54  struct m0_cm_proxy, px_linkage, px_magic,
56 
57 M0_TL_DEFINE(proxy, M0_INTERNAL, struct m0_cm_proxy);
58 
59 M0_TL_DESCR_DEFINE(proxy_fail, "copy machine proxy", M0_INTERNAL,
60  struct m0_cm_proxy, px_fail_linkage, px_magic,
62 
63 M0_TL_DEFINE(proxy_fail, M0_INTERNAL, struct m0_cm_proxy);
64 
65 M0_TL_DESCR_DEFINE(proxy_cp, "pending copy packets", M0_INTERNAL,
66  struct m0_cm_cp, c_cm_proxy_linkage, c_magix,
68 
69 M0_TL_DEFINE(proxy_cp, M0_INTERNAL, struct m0_cm_cp);
70 
71 static const struct m0_bob_type proxy_bob = {
72  .bt_name = "cm proxy",
73  .bt_magix_offset = M0_MAGIX_OFFSET(struct m0_cm_proxy, px_magic),
74  .bt_magix = CM_PROXY_LINK_MAGIC,
75  .bt_check = NULL
76 };
77 
79 
80 static bool cm_proxy_invariant(const struct m0_cm_proxy *pxy)
81 {
85  return _0C(pxy != NULL) && _0C(m0_cm_proxy_bob_check(pxy)) &&
86  _0C(m0_cm_is_locked(pxy->px_cm)) &&
87  _0C(pxy->px_endpoint != NULL);
88 }
89 
90 M0_INTERNAL int m0_cm_proxy_init(struct m0_cm_proxy *proxy, uint64_t px_id,
91  struct m0_cm_ag_id *lo, struct m0_cm_ag_id *hi,
92  const char *endpoint)
93 {
94  M0_PRE(proxy != NULL && lo != NULL && hi != NULL && endpoint != NULL);
95 
96  m0_cm_proxy_bob_init(proxy);
97  proxy_tlink_init(proxy);
98  proxy_fail_tlink_init(proxy);
99  m0_mutex_init(&proxy->px_mutex);
100  proxy_cp_tlist_init(&proxy->px_pending_cps);
101  proxy->px_id = px_id;
102  proxy->px_sw.sw_lo = *lo;
103  proxy->px_sw.sw_hi = *hi;
104  proxy->px_endpoint = endpoint;
105  proxy->px_is_done = false;
106  proxy->px_epoch = 0;
107  return 0;
108 }
109 
110 M0_INTERNAL void m0_cm_proxy_add(struct m0_cm *cm, struct m0_cm_proxy *pxy)
111 {
112  M0_ENTRY("cm: %p proxy: %p", cm, pxy);
114  M0_PRE(!proxy_tlink_is_in(pxy));
115  pxy->px_cm = cm;
116  proxy_tlist_add_tail(&cm->cm_proxies, pxy);
119  M0_ASSERT(proxy_tlink_is_in(pxy));
121  M0_LEAVE();
122 }
123 
124 M0_INTERNAL void m0_cm_proxy_del(struct m0_cm *cm, struct m0_cm_proxy *pxy)
125 {
126  M0_ENTRY("cm: %p proxy: %p", cm, pxy);
128  M0_PRE(proxy_tlink_is_in(pxy));
129  if (proxy_fail_tlink_is_in(pxy))
130  proxy_fail_tlist_del(pxy);
131  proxy_fail_tlink_fini(pxy);
132  proxy_tlink_del_fini(pxy);
134  M0_ASSERT(!proxy_tlink_is_in(pxy));
136  M0_LEAVE();
137 }
138 
139 M0_INTERNAL void m0_cm_proxy_cp_add(struct m0_cm_proxy *pxy,
140  struct m0_cm_cp *cp)
141 {
142  M0_ENTRY("proxy: %p cp: %p ep: %s", pxy, cp, pxy->px_endpoint);
144  M0_PRE(!proxy_cp_tlink_is_in(cp));
145 
146  proxy_cp_tlist_add_tail(&pxy->px_pending_cps, cp);
147  ID_LOG("proxy ag_id", &cp->c_ag->cag_id);
148  M0_POST(proxy_cp_tlink_is_in(cp));
149  M0_LEAVE();
150 }
151 
152 static void cm_proxy_cp_del(struct m0_cm_proxy *pxy,
153  struct m0_cm_cp *cp)
154 {
156  M0_PRE(proxy_cp_tlink_is_in(cp));
157  proxy_cp_tlist_del(cp);
158  M0_POST(!proxy_cp_tlink_is_in(cp));
159 }
160 
161 M0_INTERNAL struct m0_cm_proxy *m0_cm_proxy_locate(struct m0_cm *cm,
162  const char *addr)
163 {
164  struct m0_cm_proxy *pxy = NULL;
165  struct m0_net_ip_addr addr_ipaddr, pxy_ipaddr;
166 
167  /*
168  * Proxy address string (pxy->px_endpoint) cannot be directly compared
169  * with supplied address string, because the same end-point might have
170  * different address strings.
171  *
172  * Convert both address strings into m0_net_ip_addr objects, and
173  * compare them using m0_net_ip_addr_eq() API
174  */
175 
176  if (m0_net_ip_parse(addr, &addr_ipaddr) != 0)
177  return NULL;
178  m0_tl_for(proxy, &cm->cm_proxies, pxy) {
179  if (m0_net_ip_parse(pxy->px_endpoint, &pxy_ipaddr) != 0)
180  return NULL;
181  if (m0_net_ip_addr_eq(&addr_ipaddr, &pxy_ipaddr, true))
182  break;
183  } m0_tl_endfor;
184  return pxy;
185 }
186 
187 static void __wake_up_pending_cps(struct m0_cm_proxy *pxy)
188 {
189  struct m0_cm_cp *cp;
190 
191  m0_tl_for(proxy_cp, &pxy->px_pending_cps, cp) {
192  cm_proxy_cp_del(pxy, cp);
193  /* wakeup pending copy packet foms */
194  m0_fom_wakeup(&cp->c_fom);
195  } m0_tl_endfor;
196 
197 }
198 
199 static bool epoch_check(struct m0_cm_proxy *pxy, m0_time_t px_epoch)
200 {
201  if (px_epoch != pxy->px_epoch) {
202  M0_LOG(M0_WARN, "Mismatch Epoch,"
203  "current: %llu" "received: %llu",
204  (unsigned long long)pxy->px_epoch,
205  (unsigned long long)px_epoch);
206 
207  return false;
208  }
209 
210  return true;
211 }
212 
213 static void proxy_done(struct m0_cm_proxy *proxy)
214 {
215  struct m0_cm *cm = proxy->px_cm;
216  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s",
217  proxy, proxy->px_id, proxy->px_endpoint);
218 
219  if (!proxy->px_is_done) {
220  proxy->px_is_done = true;
221  m0_cm_notify(cm);
222  }
224  M0_LEAVE();
225 }
226 
232 static void _sw_update(struct m0_cm_proxy *pxy, struct m0_cm_sw *in_interval,
233  struct m0_cm_sw *out_interval, uint32_t px_status)
234 {
235  ID_LOG("proxy lo", &pxy->px_sw.sw_lo);
236  ID_LOG("proxy hi", &pxy->px_sw.sw_hi);
237 
238  if (m0_cm_sw_cmp(in_interval, &pxy->px_sw) > 0)
239  m0_cm_sw_copy(&pxy->px_sw, in_interval);
240  if (m0_cm_sw_cmp(out_interval, &pxy->px_out_interval) > 0)
241  m0_cm_sw_copy(&pxy->px_out_interval, out_interval);
242  pxy->px_status = px_status;
245 }
246 
247 static int px_ready(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval,
248  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
249  uint32_t px_status)
250 {
251  struct m0_cm *cm = p->px_cm;
252  struct m0_cm_ag_id hi;
253  int rc = 0;
254  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s", p, p->px_id, p->px_endpoint);
255 
256  if (p->px_epoch == 0 && m0_cm_state_get(cm) == M0_CMS_READY) {
257  p->px_epoch = px_epoch;
258  p->px_status = px_status;
259  hi = in_interval->sw_hi;
260  /*
261  * Here we select the minimum of the sliding window
262  * starting point provided by each remote copy machine,
263  * from which this copy machine will start in-order to
264  * keep all the copy machines in sync.
265  */
268  }
270  rc = 0;
271  } else if (m0_cm_state_get(cm) < M0_CMS_READY)
272  rc = -EINVAL;
273 
274  return M0_RC(rc);
275 }
276 
277 static int px_active(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval,
278  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
279  uint32_t px_status)
280 {
281  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s", p, p->px_id, p->px_endpoint);
282  _sw_update(p, in_interval, out_interval, px_status);
283  /* TODO This is expensive during M0_CMS_CTIVE phase but needed to
284  * handle cleanup in case of copy machine failures during active
285  * phase. Try to find another alternative.
286  */
287  m0_cm_frozen_ag_cleanup(p->px_cm, p);
288  return M0_RC(0);
289 }
290 
291 static int px_complete(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval,
292  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
293  uint32_t px_status)
294 {
295  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s", p, p->px_id, p->px_endpoint);
296  _sw_update(p, in_interval, out_interval, px_status);
297  m0_cm_frozen_ag_cleanup(p->px_cm, p);
298  return M0_RC(0);
299 }
300 
301 static int px_stop_fail(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval,
302  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
303  uint32_t px_status)
304 {
305  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s state=%u",
306  p, p->px_id, p->px_endpoint, px_status);
307  _sw_update(p, in_interval, out_interval, px_status);
308  m0_cm_frozen_ag_cleanup(p->px_cm, p);
309  proxy_done(p);
310  return M0_RC(0);
311 }
312 
313 static int (*px_action[])(struct m0_cm_proxy *px, struct m0_cm_sw *in_interval,
314  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
315  uint32_t px_status) = {
316  [M0_PX_READY] = px_ready,
321 };
322 
323 M0_INTERNAL int m0_cm_proxy_update(struct m0_cm_proxy *pxy,
324  struct m0_cm_sw *in_interval,
325  struct m0_cm_sw *out_interval,
326  uint32_t px_status,
327  m0_time_t px_epoch)
328 {
329  struct m0_cm *cm;
330  int rc;
331 
332  M0_ENTRY("proxy: %p ep: %s", pxy, pxy->px_endpoint);
333  M0_PRE(pxy != NULL && in_interval != NULL && out_interval != NULL);
334 
335  m0_cm_proxy_lock(pxy);
336  cm = pxy->px_cm;
338  M0_LOG(M0_DEBUG, "Recvd from :%s status: %u curr_status: %u "
339  "nr_updates: %u", pxy->px_endpoint, px_status,
340  pxy->px_status, (unsigned)cm->cm_nr_proxy_updated);
341 
342  if (px_status < pxy->px_status) {
343  m0_cm_proxy_unlock(pxy);
344  return M0_RC(-EINVAL);
345  }
346 
347  if (pxy->px_status != M0_PX_INIT && !epoch_check(pxy, px_epoch)) {
348  m0_cm_proxy_unlock(pxy);
349  return M0_RC(-EINVAL);
350  }
351 
352  if (px_status >= M0_PX_COMPLETE &&
353  pxy->px_status < M0_PX_COMPLETE) {
354  /*
355  * Got a fresh "complete(fail,stop)" state - need to
356  * decrease counter
357  */
358  M0_LOG(M0_DEBUG, "Decrease proxy_nr (current nr %"
359  PRIu64") cm %p, pxy %p",
360  cm->cm_proxy_active_nr, cm, pxy);
362  } else if (pxy->px_status >= M0_PX_COMPLETE &&
363  px_status < M0_PX_COMPLETE) {
364  M0_LOG(M0_DEBUG, "Increase proxy_nr (current nr %"
365  PRIu64") cm %p, pxy %p",
366  cm->cm_proxy_active_nr, cm, pxy);
368  }
369 
370  rc = px_action[px_status](pxy, in_interval, out_interval, px_epoch, px_status);
373  /*
374  * All proxies finished processing, i.e. all proxies are in
375  * COMPLETE/STOP/FAIL state.
376  */
377  if (cm->cm_proxy_active_nr == 0) {
378  M0_LOG(M0_DEBUG, "No more active proxies in cm %p", cm);
379  m0_cm_notify(cm);
380  }
381  m0_cm_proxy_unlock(pxy);
382 
383  return M0_RC(rc);
384 }
385 
386 M0_INTERNAL bool m0_cm_proxy_is_updated(struct m0_cm_proxy *proxy,
387  struct m0_cm_sw *in_interval)
388 {
389  return m0_cm_ag_id_cmp(&in_interval->sw_hi,
390  &proxy->px_last_sw_onwire_sent.sw_hi) <= 0;
391 }
392 
394  struct m0_sm_ast *ast)
395 {
396  struct m0_cm_proxy *proxy = container_of(ast, struct m0_cm_proxy,
398  struct m0_cm *cm = proxy->px_cm;
399  struct m0_cm_sw in_interval;
400  struct m0_cm_sw out_interval;
401  M0_ENTRY();
402 
404 
405  m0_cm_ag_in_interval(cm, &in_interval);
407  m0_cm_ag_id_copy(&in_interval.sw_hi,
409  m0_cm_ag_out_interval(cm, &out_interval);
410  M0_LOG(M0_DEBUG, "proxy ep: %s, cm->cm_aggr_grps_in_nr %"PRIu64
411  " pending updates: %u posted: %" PRIu64 " state=%u"
412  " px_update_rc=%d px_send_final_update=%d",
413  proxy->px_endpoint,
415  proxy->px_updates_pending,
416  proxy->px_nr_updates_posted,
417  proxy->px_status,
418  proxy->px_update_rc,
419  !!proxy->px_send_final_update);
420  ID_LOG("proxy last updated hi", &proxy->px_last_sw_onwire_sent.sw_hi);
421 
422  /*
423  * We check if updates posted are greater than 0 and decrement as
424  * there could be a case of update resend while a reply is already
425  * on wire and a proxy may receive multiple replies for an update.
426  */
427  if (proxy->px_nr_updates_posted > 0)
429 
430  if (proxy->px_update_rc != 0 || proxy->px_send_final_update ||
431  (proxy->px_updates_pending > 0 &&
432  (!m0_cm_proxy_is_updated(proxy, &in_interval) || cm->cm_abort ||
433  cm->cm_quiesce))) {
434  if (proxy->px_update_rc == -ECANCELED ||
435  (M0_IN(proxy->px_status, (M0_PX_FAILED, M0_PX_STOP)) &&
436  !proxy->px_send_final_update))
437  proxy->px_updates_pending = 0;
438  else
439  m0_cm_proxy_remote_update(proxy, &in_interval, &out_interval);
440  }
441 
442  if (m0_cm_state_get(cm) == M0_CMS_READY &&
444  proxy->px_update_rc == 0) {
446  m0_bitmap_set(&cm->cm_proxy_update_map, proxy->px_id, true);
447  }
448 
449  /* Initial handshake complete, signal waiters to continue further.*/
452 
453  /*
454  * Handle service/node failure during sns-repair/rebalance.
455  * Cannot send updates to dead proxy, all the aggregation groups,
456  * frozen on that proxy must be destroyed.
457  */
458  if (proxy->px_status == M0_PX_FAILED || m0_cm_state_get(cm) == M0_CMS_FAIL ||
459  cm->cm_quiesce || cm->cm_abort) {
460  m0_cm_proxy_lock(proxy);
461  __wake_up_pending_cps(proxy);
462  m0_cm_proxy_unlock(proxy);
463  /* Here we have already received notification from HA about
464  * the proxy failure and might receive explicit abort command as well.
465  * So no need to transition cm to FAILED state, just aborting the
466  * operation would suffice.
467  */
468  m0_cm_abort(cm, 0);
469  m0_cm_frozen_ag_cleanup(cm, proxy);
470  }
471  if (cm->cm_done || proxy->px_status == M0_PX_FAILED ||
473  /* Wake up anyone waiting to handle further process (cleanup/completion). */
475  }
476  M0_LEAVE();
477 }
478 
479 static void proxy_sw_onwire_item_replied_cb(struct m0_rpc_item *req_item)
480 {
481  struct m0_cm_proxy_sw_onwire *swu_fop;
482  struct m0_cm_sw_onwire_rep *sw_rep;
483  struct m0_rpc_item *rep_item;
484  struct m0_cm_proxy *proxy;
485  struct m0_fop *rep_fop;
486 
487  M0_ENTRY("%p", req_item);
488 
489  swu_fop = M0_AMB(swu_fop, m0_rpc_item_to_fop(req_item), pso_fop);
490  proxy = swu_fop->pso_proxy;
491  M0_ASSERT(m0_cm_proxy_bob_check(proxy));
492 
493  if (req_item->ri_error == 0) {
494  rep_item = req_item->ri_reply;
495  if (m0_rpc_item_is_generic_reply_fop(rep_item))
496  proxy->px_update_rc = m0_rpc_item_generic_reply_rc(rep_item);
497  else {
498  rep_fop = m0_rpc_item_to_fop(rep_item);
499  sw_rep = m0_fop_data(rep_fop);
500  proxy->px_update_rc = sw_rep->swr_rc;
501  }
502  } else
503  proxy->px_update_rc = req_item->ri_error;
504 
507 
508  M0_LEAVE();
509 }
510 
513 };
514 
515 static void cm_proxy_sw_onwire_post(struct m0_cm_proxy *proxy,
516  struct m0_fop *fop,
517  const struct m0_rpc_conn *conn)
518 {
519  struct m0_rpc_item *item;
520 
521  M0_ENTRY("fop: %p conn: %p to pxy %p (%s)",
522  fop, conn, proxy, proxy->px_endpoint);
523  M0_PRE(fop != NULL && conn != NULL);
524 
528  item->ri_session = proxy->px_session;
529  item->ri_deadline = 0;
530 
532  m0_rpc_post(item);
534  M0_LEAVE();
535 }
536 
537 static void proxy_sw_onwire_release(struct m0_ref *ref)
538 {
540  struct m0_fop *fop;
541 
542  fop = container_of(ref, struct m0_fop, f_ref);
543  pso_fop = container_of(fop, struct m0_cm_proxy_sw_onwire, pso_fop);
544  M0_ASSERT(pso_fop != NULL);
545  m0_fop_fini(fop);
546  m0_free(pso_fop);
547 }
548 
549 M0_INTERNAL int m0_cm_proxy_remote_update(struct m0_cm_proxy *proxy,
550  struct m0_cm_sw *in_interval,
551  struct m0_cm_sw *out_interval)
552 {
553  struct m0_cm *cm;
554  struct m0_rpc_machine *rmach;
555  struct m0_rpc_conn *conn;
556  struct m0_cm_proxy_sw_onwire *sw_fop;
557  struct m0_fop *fop;
558  const char *ep;
559  int rc;
560 
561  M0_PRE(proxy != NULL);
562  M0_ENTRY("proxy: %p (%s)", proxy, proxy->px_endpoint);
563  cm = proxy->px_cm;
565 
566  if (proxy->px_nr_updates_posted > 0) {
568  return M0_RC(0);
569  }
570  M0_ALLOC_PTR(sw_fop);
571  if (sw_fop == NULL)
572  return M0_ERR(-ENOMEM);
573  fop = &sw_fop->pso_fop;
574  rmach = proxy->px_conn->c_rpc_machine;
575  ep = rmach->rm_tm.ntm_ep->nep_addr;
576  conn = proxy->px_conn;
579  proxy->px_id, ep, in_interval,
580  out_interval);
581  if (rc != 0) {
583  m0_free(sw_fop);
584  return M0_ERR(rc);
585  }
586 
587  if (proxy->px_send_final_update) {
588  struct m0_cm_sw_onwire *swo;
589  proxy->px_send_final_update = false;
590 
591  /* This is the final update. No more SW update will be sent.
592  * CM status must be set to STOP, so the proxy on remote node
593  * can be finalized.
594  */
595  swo = m0_fop_data(fop);
596  swo->swo_cm_status = M0_PX_STOP;
597  }
598 
599  sw_fop->pso_proxy = proxy;
600  ID_LOG("proxy last updated hi", &proxy->px_last_sw_onwire_sent.sw_hi);
601 
603  m0_cm_sw_copy(&proxy->px_last_sw_onwire_sent, in_interval);
604 
605  M0_LOG(M0_DEBUG, "Sending to %s hi: ["M0_AG_F"]",
606  proxy->px_endpoint, M0_AG_P(&in_interval->sw_hi));
607 
608  return M0_RC(0);
609 }
610 
611 M0_INTERNAL bool m0_cm_proxy_is_done(const struct m0_cm_proxy *pxy)
612 {
613  M0_LOG(M0_DEBUG, "proxy %p (to %s) state: is_done %d "
614  "px_nr_updates_posted %" PRIu64 " onwire_ast.sa_next %p",
615  pxy, pxy->px_endpoint,
616  pxy->px_is_done, pxy->px_nr_updates_posted,
618 
619  return pxy->px_is_done && pxy->px_nr_updates_posted == 0 &&
620  pxy->px_sw_onwire_ast.sa_next == NULL;
621 }
622 
623 M0_INTERNAL void m0_cm_proxy_fini(struct m0_cm_proxy *pxy)
624 {
625  M0_ENTRY("%p", pxy);
626  M0_PRE(pxy != NULL);
627  M0_PRE(proxy_cp_tlist_is_empty(&pxy->px_pending_cps));
628 
629  proxy_cp_tlist_fini(&pxy->px_pending_cps);
630  m0_cm_proxy_bob_fini(pxy);
631  if (m0_clink_is_armed(&pxy->px_ha_link)) {
633  m0_clink_fini(&pxy->px_ha_link);
634  }
635  m0_mutex_fini(&pxy->px_mutex);
636  M0_LEAVE();
637 }
638 
639 M0_INTERNAL uint64_t m0_cm_proxy_nr(struct m0_cm *cm)
640 {
642 
643  return proxy_tlist_length(&cm->cm_proxies);
644 }
645 
646 M0_INTERNAL bool m0_cm_proxy_agid_is_in_sw(struct m0_cm_proxy *pxy,
647  struct m0_cm_ag_id *id)
648 {
649  bool result;
650 
651  m0_cm_proxy_lock(pxy);
652  result = m0_cm_ag_id_cmp(id, &pxy->px_sw.sw_lo) >= 0 &&
653  m0_cm_ag_id_cmp(id, &pxy->px_sw.sw_hi) <= 0;
654  m0_cm_proxy_unlock(pxy);
655 
656  return result;
657 }
658 
659 M0_INTERNAL void m0_cm_proxy_pending_cps_wakeup(struct m0_cm *cm)
660 {
661  struct m0_cm_proxy *pxy;
662 
663  m0_tl_for(proxy, &cm->cm_proxies, pxy) {
665  } m0_tl_endfor;
666 }
667 
668 static void px_fail_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
669 {
670  struct m0_cm_proxy *pxy = container_of(ast, struct m0_cm_proxy,
671  px_fail_ast);
672  M0_ENTRY("pxy %p %s failed", pxy, pxy->px_endpoint);
673 
674  m0_cm_proxy_lock(pxy);
675  pxy->px_status = M0_PX_FAILED;
676  pxy->px_is_done = true;
677  if (!proxy_fail_tlink_is_in(pxy))
678  proxy_fail_tlist_add_tail(&pxy->px_cm->cm_failed_proxies, pxy);
680  m0_cm_proxy_unlock(pxy);
681  m0_cm_abort(pxy->px_cm, 0);
682  m0_cm_frozen_ag_cleanup(pxy->px_cm, pxy);
684  M0_LEAVE();
685 }
686 
687 static void px_online_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
688 {
689  struct m0_cm_proxy *pxy = container_of(ast, struct m0_cm_proxy,
690  px_online_ast);
691 
692  /*
693  * Do nothing for now, ongoing sns operation must cleanup and complete
694  * and sns repair/rebalance must be restarted.
695  */
696  M0_LOG(M0_DEBUG, "proxy %s is online", pxy->px_endpoint);
697 }
698 
699 static bool proxy_clink_cb(struct m0_clink *clink)
700 {
701  struct m0_cm_proxy *pxy = M0_AMB(pxy, clink, px_ha_link);
702  struct m0_conf_obj *svc_obj = container_of(clink->cl_chan,
703  struct m0_conf_obj,
704  co_ha_chan);
705  struct m0_sm_ast *ast = NULL;
706 
708 
709  if (M0_IN(svc_obj->co_ha_state, (M0_NC_FAILED, M0_NC_TRANSIENT))) {
711  ast = &pxy->px_fail_ast;
712  } else if (svc_obj->co_ha_state == M0_NC_ONLINE &&
713  pxy->px_status == M0_PX_FAILED) {
715  ast = &pxy->px_online_ast;
716  }
718 
719  return true;
720 }
721 
722 M0_INTERNAL void m0_cm_proxy_event_handle_register(struct m0_cm_proxy *pxy,
723  struct m0_conf_obj *svc_obj)
724 {
726  m0_clink_add_lock(&svc_obj->co_ha_chan, &pxy->px_ha_link);
727 }
728 
729 M0_INTERNAL bool m0_cm_proxy_is_locked(struct m0_cm_proxy *pxy)
730 {
731  return m0_mutex_is_locked(&pxy->px_mutex);
732 }
733 
734 M0_INTERNAL void m0_cm_proxy_lock(struct m0_cm_proxy *pxy)
735 {
736  m0_mutex_lock(&pxy->px_mutex);
737 }
738 
739 M0_INTERNAL void m0_cm_proxy_unlock(struct m0_cm_proxy *pxy)
740 {
741  m0_mutex_unlock(&pxy->px_mutex);
742 }
743 
744 M0_INTERNAL bool m0_cm_proxies_ready(const struct m0_cm *cm)
745 {
746  uint32_t nr_failed_proxies;
747 
749 
750  nr_failed_proxies = proxy_fail_tlist_length(&cm->cm_failed_proxies);
751  return cm->cm_nr_proxy_updated == (cm->cm_proxy_nr - nr_failed_proxies) * 2;
752 }
753 
754 M0_INTERNAL int m0_cm_proxy_in_count_alloc(struct m0_cm_proxy_in_count *pcount,
755  uint32_t nr_proxies)
756 {
757  M0_PRE(nr_proxies > 0);
758 
759  M0_ALLOC_ARR(pcount->p_count, nr_proxies);
760  if (pcount->p_count == NULL)
761  return -ENOMEM;
762  pcount->p_nr = nr_proxies;
763 
764  return 0;
765 }
766 
767 M0_INTERNAL void m0_cm_proxy_in_count_free(struct m0_cm_proxy_in_count *pcount)
768 {
769  M0_PRE(pcount != NULL);
770 
771  m0_free(pcount->p_count);
772  pcount->p_count = NULL;
773  pcount->p_nr = 0;
774 }
775 
776 M0_INTERNAL void m0_cm_proxies_sent_reset(struct m0_cm *cm)
777 {
778  struct m0_cm_proxy *pxy;
779 
780  m0_tl_for(proxy, &cm->cm_proxies, pxy) {
782  } m0_tl_endfor;
783 }
784 
785 #undef M0_TRACE_SUBSYSTEM
786 
788 /*
789  * Local variables:
790  * c-indentation-style: "K&R"
791  * c-basic-offset: 8
792  * tab-width: 8
793  * fill-column: 80
794  * scroll-step: 1
795  * End:
796  */
M0_INTERNAL void m0_cm_ag_id_copy(struct m0_cm_ag_id *dst, const struct m0_cm_ag_id *src)
Definition: ag.c:83
const struct m0_conf_obj_type * m0_conf_obj_type(const struct m0_conf_obj *obj)
Definition: obj.c:363
uint64_t cm_aggr_grps_in_nr
Definition: cm.h:205
static void proxy_sw_onwire_release(struct m0_ref *ref)
Definition: proxy.c:537
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
static const struct m0_bob_type proxy_bob
Definition: proxy.c:71
static struct m0_addb2_philter p
Definition: consumer.c:40
M0_INTERNAL bool m0_cm_proxy_is_done(const struct m0_cm_proxy *pxy)
Definition: proxy.c:611
M0_INTERNAL void m0_cm_frozen_ag_cleanup(struct m0_cm *cm, struct m0_cm_proxy *proxy)
Definition: cm.c:1148
static void px_fail_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: proxy.c:668
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
struct m0_mutex px_mutex
Definition: proxy.h:112
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
const struct m0_rpc_item_ops proxy_sw_onwire_item_ops
Definition: proxy.c:511
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
Definition: sw.h:45
#define NULL
Definition: misc.h:38
#define M0_AG_P(ag)
Definition: ag.h:55
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
#define ID_LOG(prefix, id)
Definition: ag.h:57
struct m0_bitmap cm_proxy_update_map
Definition: cm.h:252
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
M0_INTERNAL void m0_cm_proxy_lock(struct m0_cm_proxy *pxy)
Definition: proxy.c:734
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
bool m0_rpc_item_is_generic_reply_fop(const struct m0_rpc_item *item)
Definition: fom_generic.c:75
bool cm_quiesce
Definition: cm.h:277
const struct m0_cm_ops * cm_ops
Definition: cm.h:188
static void proxy_sw_onwire_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: proxy.c:393
struct m0_cm_ag_id sw_hi
Definition: sw.h:47
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_TL_DEFINE(proxy, M0_INTERNAL, struct m0_cm_proxy)
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
Definition: cp.h:160
M0_LEAVE()
struct m0_cm_sw px_last_sw_onwire_sent
Definition: proxy.h:71
const struct m0_conf_obj_type M0_CONF_SERVICE_TYPE
Definition: service.c:156
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL void m0_cm_proxy_unlock(struct m0_cm_proxy *pxy)
Definition: proxy.c:739
bool cm_done
Definition: cm.h:265
struct m0_tl px_pending_cps
Definition: proxy.h:110
static bool epoch_check(struct m0_cm_proxy *pxy, m0_time_t px_epoch)
Definition: proxy.c:199
M0_INTERNAL bool m0_cm_proxies_ready(const struct m0_cm *cm)
Definition: proxy.c:744
struct m0_cm_ag_id cag_id
Definition: ag.h:72
int32_t ri_error
Definition: item.h:161
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:219
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
M0_INTERNAL void m0_cm_ag_in_interval(const struct m0_cm *cm, struct m0_cm_sw *in_interval)
Definition: ag.c:436
Definition: sm.h:504
const char * nep_addr
Definition: net.h:503
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL bool m0_cm_proxy_is_updated(struct m0_cm_proxy *proxy, struct m0_cm_sw *in_interval)
Definition: proxy.c:386
int px_update_rc
Definition: proxy.h:98
uint32_t * p_count
Definition: proxy.h:154
int(* cmo_sw_onwire_fop_setup)(struct m0_cm *cm, struct m0_fop *fop, void(*fop_release)(struct m0_ref *), uint64_t proxy_id, const char *local_ep, const struct m0_cm_sw *sw, const struct m0_cm_sw *out_interval)
Definition: cm.h:342
M0_INTERNAL bool m0_cm_ag_id_is_set(const struct m0_cm_ag_id *id)
Definition: ag.c:95
static struct m0_rpc_item * item
Definition: item.c:56
static int px_ready(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:247
const char * bt_name
Definition: bob.h:73
struct m0_rpc_session * px_session
Definition: proxy.h:116
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
static struct m0_cm * cm
Definition: cm.c:63
Definition: sock.c:754
struct m0_tl cm_proxies
Definition: cm.h:246
uint64_t cm_nr_proxy_updated
Definition: cm.h:253
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
#define M0_AG_F
Definition: ag.h:54
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
M0_INTERNAL void m0_cm_complete_notify(struct m0_cm *cm)
Definition: cm.c:1133
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
Definition: chan.c:159
M0_INTERNAL bool m0_cm_proxy_agid_is_in_sw(struct m0_cm_proxy *pxy, struct m0_cm_ag_id *id)
Definition: proxy.c:646
#define PRIu64
Definition: types.h:58
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
static void proxy_done(struct m0_cm_proxy *proxy)
Definition: proxy.c:213
M0_INTERNAL void m0_cm_proxy_cp_add(struct m0_cm_proxy *pxy, struct m0_cm_cp *cp)
Definition: proxy.c:139
return M0_ERR(-EOPNOTSUPP)
struct m0_clink px_ha_link
Definition: proxy.h:134
M0_INTERNAL void m0_cm_proxies_sent_reset(struct m0_cm *cm)
Definition: proxy.c:776
M0_INTERNAL void m0_cm_proxy_event_handle_register(struct m0_cm_proxy *pxy, struct m0_conf_obj *svc_obj)
Definition: proxy.c:722
M0_INTERNAL void m0_cm_sw_copy(struct m0_cm_sw *dst, const struct m0_cm_sw *src)
Definition: sw.c:67
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
Definition: refs.h:34
enum m0_proxy_state px_status
Definition: proxy.h:91
struct m0_sm_ast px_online_ast
Definition: proxy.h:89
const char * px_endpoint
Definition: proxy.h:118
static int px_active(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:277
static int px_complete(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:291
M0_INTERNAL int m0_cm_ag_id_cmp(const struct m0_cm_ag_id *id0, const struct m0_cm_ag_id *id1)
Definition: ag.c:73
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
#define M0_ASSERT(cond)
static void px_online_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: proxy.c:687
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_TL_DESCR_DEFINE(proxy, "copy machine proxy", M0_INTERNAL, struct m0_cm_proxy, px_linkage, px_magic, CM_PROXY_LINK_MAGIC, CM_PROXY_HEAD_MAGIC)
uint64_t px_nr_updates_posted
Definition: proxy.h:95
struct m0_rpc_conn * px_conn
Definition: proxy.h:114
enum m0_ha_obj_state co_ha_state
Definition: obj.h:241
struct m0_cm_sw px_sw
Definition: proxy.h:68
static bool cm_proxy_invariant(const struct m0_cm_proxy *pxy)
Definition: proxy.c:80
struct m0_cm_ag_id sw_lo
Definition: sw.h:46
struct m0_rpc_item * ri_reply
Definition: item.h:163
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
#define M0_POST(cond)
Definition: xcode.h:73
M0_INTERNAL void m0_cm_proxy_del(struct m0_cm *cm, struct m0_cm_proxy *pxy)
Definition: proxy.c:124
struct m0_chan co_ha_chan
Definition: obj.h:248
M0_INTERNAL void m0_bitmap_set(struct m0_bitmap *map, size_t idx, bool val)
Definition: bitmap.c:139
bool px_is_done
Definition: proxy.h:93
M0_INTERNAL bool m0_cm_proxy_is_locked(struct m0_cm_proxy *pxy)
Definition: proxy.c:729
struct m0_cm_aggr_group * c_ag
Definition: cp.h:172
static bool proxy_clink_cb(struct m0_clink *clink)
Definition: proxy.c:699
struct m0_sm_ast * sa_next
Definition: sm.h:509
M0_INTERNAL int m0_cm_proxy_update(struct m0_cm_proxy *pxy, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, uint32_t px_status, m0_time_t px_epoch)
Definition: proxy.c:323
struct m0_rpc_conn conn
Definition: fsync.c:96
static void proxy_sw_onwire_item_replied_cb(struct m0_rpc_item *req_item)
Definition: proxy.c:479
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:135
struct m0_sm_ast px_fail_ast
Definition: proxy.h:87
static struct m0_clink clink[RDWR_REQUEST_MAX]
uint64_t px_id
Definition: proxy.h:63
M0_INTERNAL uint64_t m0_cm_proxy_nr(struct m0_cm *cm)
Definition: proxy.c:639
struct m0_chan cm_proxy_init_wait
Definition: cm.h:240
struct m0_cm_proxy * pso_proxy
Definition: proxy.h:167
static int px_stop_fail(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:301
static void __wake_up_pending_cps(struct m0_cm_proxy *pxy)
Definition: proxy.c:187
bool px_send_final_update
Definition: proxy.h:142
M0_INTERNAL void m0_cm_proxy_in_count_free(struct m0_cm_proxy_in_count *pcount)
Definition: proxy.c:767
uint32_t p_nr
Definition: proxy.h:152
struct m0_sm_group cm_sm_group
Definition: cm.h:185
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
struct m0_cm_sw px_out_interval
Definition: proxy.h:77
uint64_t cm_proxy_nr
Definition: cm.h:250
#define M0_MAGIX_OFFSET(type, field)
Definition: misc.h:356
char * ep
Definition: sw.h:132
#define M0_CNT_INC(cnt)
Definition: arith.h:226
M0_INTERNAL void m0_cm_notify(struct m0_cm *cm)
Definition: cm.c:1081
struct m0_ref f_ref
Definition: fop.h:81
struct m0_net_end_point * ntm_ep
Definition: net.h:868
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
M0_INTERNAL void m0_cm_ag_out_interval(const struct m0_cm *cm, struct m0_cm_sw *out_interval)
Definition: ag.c:454
M0_INTERNAL void m0_cm_proxy_fini(struct m0_cm_proxy *pxy)
Definition: proxy.c:623
static void cm_proxy_sw_onwire_post(struct m0_cm_proxy *proxy, struct m0_fop *fop, const struct m0_rpc_conn *conn)
Definition: proxy.c:515
uint64_t cm_proxy_active_nr
Definition: cm.h:254
M0_INTERNAL int m0_cm_proxy_init(struct m0_cm_proxy *proxy, uint64_t px_id, struct m0_cm_ag_id *lo, struct m0_cm_ag_id *hi, const char *endpoint)
Definition: proxy.c:90
M0_INTERNAL int m0_cm_proxy_in_count_alloc(struct m0_cm_proxy_in_count *pcount, uint32_t nr_proxies)
Definition: proxy.c:754
struct m0_rpc_session * ri_session
Definition: item.h:147
Definition: cm.h:166
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
Definition: fop.c:337
M0_INTERNAL int m0_net_ip_parse(const char *name, struct m0_net_ip_addr *addr)
Definition: ip.c:350
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
Definition: bitmap.c:105
M0_BOB_DEFINE(static, &proxy_bob, m0_cm_proxy)
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL struct m0_cm_proxy * m0_cm_proxy_locate(struct m0_cm *cm, const char *addr)
Definition: proxy.c:161
M0_INTERNAL enum m0_cm_state m0_cm_state_get(const struct m0_cm *cm)
Definition: cm.c:565
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static void cm_proxy_cp_del(struct m0_cm_proxy *pxy, struct m0_cm_cp *cp)
Definition: proxy.c:152
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:198
struct m0_fop pso_fop
Definition: proxy.h:162
static struct m0_fop * fop
Definition: item.c:57
M0_INTERNAL void m0_cm_proxy_pending_cps_wakeup(struct m0_cm *cm)
Definition: proxy.c:659
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:345
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
struct m0_fom c_fom
Definition: cp.h:161
uint32_t swo_cm_status
Definition: sw.h:75
static void _sw_update(struct m0_cm_proxy *pxy, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, uint32_t px_status)
Definition: proxy.c:232
M0_INTERNAL void m0_cm_abort(struct m0_cm *cm, int rc)
Definition: cm.c:1181
M0_INTERNAL bool m0_cm_is_locked(const struct m0_cm *cm)
Definition: cm.c:560
struct m0_tl cm_failed_proxies
Definition: cm.h:248
struct m0_sm_ast px_sw_onwire_ast
Definition: proxy.h:85
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
m0_time_t px_epoch
Definition: proxy.h:65
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL bool m0_cm_sw_cmp(const struct m0_cm_sw *sw0, const struct m0_cm_sw *sw1)
Definition: sw.c:51
M0_INTERNAL int m0_cm_proxy_remote_update(struct m0_cm_proxy *proxy, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval)
Definition: proxy.c:549
uint32_t px_updates_pending
Definition: proxy.h:100
M0_INTERNAL void m0_cm_proxy_add(struct m0_cm *cm, struct m0_cm_proxy *pxy)
Definition: proxy.c:110
M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
Definition: chan.c:172
int32_t rc
Definition: trigger_fop.h:47
bool cm_abort
Definition: cm.h:282
static void hi(void)
Definition: nucleus.c:93
Definition: ag.h:49
Definition: fop.h:80
Definition: trace.h:478
struct m0_cm_ag_id cm_sw_last_updated_hi
Definition: cm.h:220
struct m0_fop * rep_fop
Definition: dir.c:334
struct m0_cm * px_cm
Definition: proxy.h:103
m0_time_t ri_deadline
Definition: item.h:141
static int(* px_action[])(struct m0_cm_proxy *px, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:313
M0_INTERNAL bool m0_net_ip_addr_eq(const struct m0_net_ip_addr *addr1, const struct m0_net_ip_addr *addr2, bool is_ncmp)
Definition: ip.c:471