Source code for infrahouse_toolkit.cli.ih_elastic.cmd_cluster.cmd_commision_node
"""
.. topic:: ``ih-elastic cluster commission-node``
A ``ih-elastic cluster commission-node`` subcommand.
See ``ih-elastic cluster commission-node --help`` for more details.
"""
import json
import sys
from logging import getLogger
from time import sleep
import click
from botocore.exceptions import ClientError
from elasticsearch.client import ClusterClient
from infrahouse_core.aws.asg import ASG
from infrahouse_core.aws.asg_instance import ASGInstance
from infrahouse_toolkit.timeout import timeout
LOG = getLogger()
[docs]def wait_until_complete(client: ClusterClient, hook_name: str = None, wait_time: int = 48 * 3600):
"""
Give up to ``wait_time`` seconds to Elasticsearch to fully join the cluster.
:param client: a cluster Elasticsearch client.
:type client: ClusterClient
:param wait_time: Time in second to wait until the cluster finishes shard relocation.
:type wait_time: int
:param hook_name: Lifecycle hook name to extend while waiting.
:type hook_name: str
:raise TimeoutError: if after ``wait_time``, Elasticsearch hasn't moved all shards from the node.
"""
local_instance = ASGInstance()
asg = ASG(asg_name=local_instance.asg_name)
if wait_time:
try:
with timeout(wait_time):
while True:
health = client.health()
LOG.info(
"Current cluster state:\n %s",
json.dumps(health.body, indent=4),
)
if health.body["relocating_shards"] == 0:
break
if hook_name and local_instance.lifecycle_state == "Pending:Wait":
LOG.debug("Extend lifecycle hook %s", hook_name)
asg.record_lifecycle_action_heartbeat(hook_name=hook_name)
sleep(3)
except TimeoutError as err:
if hook_name:
complete_lifecycle_action(hook_name, result="ABANDON")
LOG.error(err)
asg.cancel_instance_refresh()
raise
else:
LOG.warning("wait_time = %r, not waiting for Elasticsearch onboard this node.", wait_time)
[docs]def complete_lifecycle_action(hook_name, result="CONTINUE"):
"""
Completes the lifecycle hook.
If it fails, cancel all instance refreshes in the autoscaling group.
"""
asg = ASG(asg_name=ASGInstance().asg_name)
try:
asg.complete_lifecycle_action(hook_name=hook_name, result=result)
except ClientError as err:
LOG.error(err)
asg.cancel_instance_refresh()
sys.exit(1)
@click.command(name="commission-node")
@click.option(
"--wait-until-complete",
help="Wait this many seconds until Elasticsearch completes moving shards out of this node.",
default=48 * 3600,
type=click.INT,
show_default=True,
)
@click.option(
"--complete-lifecycle-action",
help="Complete the lifecycle action when the node is fully provisioned.",
default=None,
show_default=True,
)
@click.pass_context
def cmd_commission_node(ctx, **kwargs):
"""
Ensures the local node fully provisions.When the node is fully operational, optionally
complete a lifecycle hook.
"""
hook_name = kwargs["complete_lifecycle_action"]
wait_time = kwargs["wait_until_complete"]
client = ClusterClient(ctx.obj["es"])
health = client.health()
LOG.info("Checking %s cluster health: %s", health.body["cluster_name"], health.body["status"])
LOG.debug("Cluster health %s", json.dumps(health.body, indent=4))
# Trigger re-balancing check
client.reroute(retry_failed=True)
wait_until_complete(client, hook_name, wait_time=wait_time)
if hook_name:
complete_lifecycle_action(hook_name=hook_name)