dlmmaster.c 98.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
/* -*- mode: c; c-basic-offset: 8; -*-
 * vim: noexpandtab sw=8 ts=8 sts=0:
 *
 * dlmmod.c
 *
 * standalone DLM module
 *
 * Copyright (C) 2004 Oracle.  All rights reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public
 * License along with this program; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 021110-1307, USA.
 *
 */


#include <linux/module.h>
#include <linux/fs.h>
#include <linux/types.h>
#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/init.h>
#include <linux/sysctl.h>
#include <linux/random.h>
#include <linux/blkdev.h>
#include <linux/socket.h>
#include <linux/inet.h>
#include <linux/spinlock.h>
#include <linux/delay.h>


#include "cluster/heartbeat.h"
#include "cluster/nodemanager.h"
#include "cluster/tcp.h"

#include "dlmapi.h"
#include "dlmcommon.h"
49
#include "dlmdomain.h"
50
#include "dlmdebug.h"
51 52 53 54 55 56 57 58 59 60 61 62 63 64

#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
#include "cluster/masklog.h"

static void dlm_mle_node_down(struct dlm_ctxt *dlm,
			      struct dlm_master_list_entry *mle,
			      struct o2nm_node *node,
			      int idx);
static void dlm_mle_node_up(struct dlm_ctxt *dlm,
			    struct dlm_master_list_entry *mle,
			    struct o2nm_node *node,
			    int idx);

static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
65 66 67
static int dlm_do_assert_master(struct dlm_ctxt *dlm,
				struct dlm_lock_resource *res,
				void *nodemap, u32 flags);
68
static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
69 70 71 72 73 74 75 76 77

static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
				struct dlm_master_list_entry *mle,
				const char *name,
				unsigned int namelen)
{
	if (dlm != mle->dlm)
		return 0;

78 79
	if (namelen != mle->mnamelen ||
	    memcmp(name, mle->mname, namelen) != 0)
80 81
		return 0;

82 83 84
	return 1;
}

85 86 87
static struct kmem_cache *dlm_lockres_cache;
static struct kmem_cache *dlm_lockname_cache;
static struct kmem_cache *dlm_mle_cache;
88 89 90 91 92 93 94 95 96 97 98 99 100 101

static void dlm_mle_release(struct kref *kref);
static void dlm_init_mle(struct dlm_master_list_entry *mle,
			enum dlm_mle_type type,
			struct dlm_ctxt *dlm,
			struct dlm_lock_resource *res,
			const char *name,
			unsigned int namelen);
static void dlm_put_mle(struct dlm_master_list_entry *mle);
static void __dlm_put_mle(struct dlm_master_list_entry *mle);
static int dlm_find_mle(struct dlm_ctxt *dlm,
			struct dlm_master_list_entry **mle,
			char *name, unsigned int namelen);

102 103
static int dlm_do_master_request(struct dlm_lock_resource *res,
				 struct dlm_master_list_entry *mle, int to);
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127


static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
				     struct dlm_lock_resource *res,
				     struct dlm_master_list_entry *mle,
				     int *blocked);
static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
				    struct dlm_lock_resource *res,
				    struct dlm_master_list_entry *mle,
				    int blocked);
static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
				 struct dlm_lock_resource *res,
				 struct dlm_master_list_entry *mle,
				 struct dlm_master_list_entry **oldmle,
				 const char *name, unsigned int namelen,
				 u8 new_master, u8 master);

static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
				    struct dlm_lock_resource *res);
static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
				      struct dlm_lock_resource *res);
static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
				       struct dlm_lock_resource *res,
				       u8 target);
128 129
static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
				       struct dlm_lock_resource *res);
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202


int dlm_is_host_down(int errno)
{
	switch (errno) {
		case -EBADF:
		case -ECONNREFUSED:
		case -ENOTCONN:
		case -ECONNRESET:
		case -EPIPE:
		case -EHOSTDOWN:
		case -EHOSTUNREACH:
		case -ETIMEDOUT:
		case -ECONNABORTED:
		case -ENETDOWN:
		case -ENETUNREACH:
		case -ENETRESET:
		case -ESHUTDOWN:
		case -ENOPROTOOPT:
		case -EINVAL:   /* if returned from our tcp code,
				   this means there is no socket */
			return 1;
	}
	return 0;
}


/*
 * MASTER LIST FUNCTIONS
 */


/*
 * regarding master list entries and heartbeat callbacks:
 *
 * in order to avoid sleeping and allocation that occurs in
 * heartbeat, master list entries are simply attached to the
 * dlm's established heartbeat callbacks.  the mle is attached
 * when it is created, and since the dlm->spinlock is held at
 * that time, any heartbeat event will be properly discovered
 * by the mle.  the mle needs to be detached from the
 * dlm->mle_hb_events list as soon as heartbeat events are no
 * longer useful to the mle, and before the mle is freed.
 *
 * as a general rule, heartbeat events are no longer needed by
 * the mle once an "answer" regarding the lock master has been
 * received.
 */
static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
					      struct dlm_master_list_entry *mle)
{
	assert_spin_locked(&dlm->spinlock);

	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
}


static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
					      struct dlm_master_list_entry *mle)
{
	if (!list_empty(&mle->hb_events))
		list_del_init(&mle->hb_events);
}


static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
					    struct dlm_master_list_entry *mle)
{
	spin_lock(&dlm->spinlock);
	__dlm_mle_detach_hb_events(dlm, mle);
	spin_unlock(&dlm->spinlock);
}

Kurt Hackel's avatar
Kurt Hackel committed
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
{
	struct dlm_ctxt *dlm;
	dlm = mle->dlm;

	assert_spin_locked(&dlm->spinlock);
	assert_spin_locked(&dlm->master_lock);
	mle->inuse++;
	kref_get(&mle->mle_refs);
}

static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
{
	struct dlm_ctxt *dlm;
	dlm = mle->dlm;

	spin_lock(&dlm->spinlock);
	spin_lock(&dlm->master_lock);
	mle->inuse--;
	__dlm_put_mle(mle);
	spin_unlock(&dlm->master_lock);
	spin_unlock(&dlm->spinlock);

}

228 229 230 231 232 233 234 235
/* remove from list and free */
static void __dlm_put_mle(struct dlm_master_list_entry *mle)
{
	struct dlm_ctxt *dlm;
	dlm = mle->dlm;

	assert_spin_locked(&dlm->spinlock);
	assert_spin_locked(&dlm->master_lock);
236
	if (!kref_read(&mle->mle_refs)) {
Kurt Hackel's avatar
Kurt Hackel committed
237 238 239 240 241 242 243
		/* this may or may not crash, but who cares.
		 * it's a BUG. */
		mlog(ML_ERROR, "bad mle: %p\n", mle);
		dlm_print_one_mle(mle);
		BUG();
	} else
		kref_put(&mle->mle_refs, dlm_mle_release);
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
}


/* must not have any spinlocks coming in */
static void dlm_put_mle(struct dlm_master_list_entry *mle)
{
	struct dlm_ctxt *dlm;
	dlm = mle->dlm;

	spin_lock(&dlm->spinlock);
	spin_lock(&dlm->master_lock);
	__dlm_put_mle(mle);
	spin_unlock(&dlm->master_lock);
	spin_unlock(&dlm->spinlock);
}

static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
{
	kref_get(&mle->mle_refs);
}

static void dlm_init_mle(struct dlm_master_list_entry *mle,
			enum dlm_mle_type type,
			struct dlm_ctxt *dlm,
			struct dlm_lock_resource *res,
			const char *name,
			unsigned int namelen)
{
	assert_spin_locked(&dlm->spinlock);

	mle->dlm = dlm;
	mle->type = type;
276
	INIT_HLIST_NODE(&mle->master_hash_node);
277 278 279 280 281 282 283 284 285
	INIT_LIST_HEAD(&mle->hb_events);
	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
	spin_lock_init(&mle->spinlock);
	init_waitqueue_head(&mle->wq);
	atomic_set(&mle->woken, 0);
	kref_init(&mle->mle_refs);
	memset(mle->response_map, 0, sizeof(mle->response_map));
	mle->master = O2NM_MAX_NODES;
	mle->new_master = O2NM_MAX_NODES;
Kurt Hackel's avatar
Kurt Hackel committed
286
	mle->inuse = 0;
287

288 289 290 291
	BUG_ON(mle->type != DLM_MLE_BLOCK &&
	       mle->type != DLM_MLE_MASTER &&
	       mle->type != DLM_MLE_MIGRATION);

292 293
	if (mle->type == DLM_MLE_MASTER) {
		BUG_ON(!res);
294 295 296 297
		mle->mleres = res;
		memcpy(mle->mname, res->lockname.name, res->lockname.len);
		mle->mnamelen = res->lockname.len;
		mle->mnamehash = res->lockname.hash;
298
	} else {
299
		BUG_ON(!name);
300 301 302 303
		mle->mleres = NULL;
		memcpy(mle->mname, name, namelen);
		mle->mnamelen = namelen;
		mle->mnamehash = dlm_lockid_hash(name, namelen);
304 305
	}

306 307 308
	atomic_inc(&dlm->mle_tot_count[mle->type]);
	atomic_inc(&dlm->mle_cur_count[mle->type]);

309 310 311 312 313 314 315 316 317 318
	/* copy off the node_map and register hb callbacks on our copy */
	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
	clear_bit(dlm->node_num, mle->vote_map);
	clear_bit(dlm->node_num, mle->node_map);

	/* attach the mle to the domain node up/down events */
	__dlm_mle_attach_hb_events(dlm, mle);
}

319 320 321 322 323
void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
{
	assert_spin_locked(&dlm->spinlock);
	assert_spin_locked(&dlm->master_lock);

324 325
	if (!hlist_unhashed(&mle->master_hash_node))
		hlist_del_init(&mle->master_hash_node);
326 327 328 329
}

void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
{
330 331
	struct hlist_head *bucket;

332 333
	assert_spin_locked(&dlm->master_lock);

334
	bucket = dlm_master_hash(dlm, mle->mnamehash);
335
	hlist_add_head(&mle->master_hash_node, bucket);
336
}
337 338 339 340 341 342 343

/* returns 1 if found, 0 if not */
static int dlm_find_mle(struct dlm_ctxt *dlm,
			struct dlm_master_list_entry **mle,
			char *name, unsigned int namelen)
{
	struct dlm_master_list_entry *tmpmle;
344 345
	struct hlist_head *bucket;
	unsigned int hash;
346 347 348

	assert_spin_locked(&dlm->master_lock);

349 350
	hash = dlm_lockid_hash(name, namelen);
	bucket = dlm_master_hash(dlm, hash);
351
	hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
352 353 354 355 356 357 358 359 360 361 362 363 364 365
		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
			continue;
		dlm_get_mle(tmpmle);
		*mle = tmpmle;
		return 1;
	}
	return 0;
}

void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
{
	struct dlm_master_list_entry *mle;

	assert_spin_locked(&dlm->spinlock);
366

367
	list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
		if (node_up)
			dlm_mle_node_up(dlm, mle, NULL, idx);
		else
			dlm_mle_node_down(dlm, mle, NULL, idx);
	}
}

static void dlm_mle_node_down(struct dlm_ctxt *dlm,
			      struct dlm_master_list_entry *mle,
			      struct o2nm_node *node, int idx)
{
	spin_lock(&mle->spinlock);

	if (!test_bit(idx, mle->node_map))
		mlog(0, "node %u already removed from nodemap!\n", idx);
	else
		clear_bit(idx, mle->node_map);

	spin_unlock(&mle->spinlock);
}

static void dlm_mle_node_up(struct dlm_ctxt *dlm,
			    struct dlm_master_list_entry *mle,
			    struct o2nm_node *node, int idx)
{
	spin_lock(&mle->spinlock);

	if (test_bit(idx, mle->node_map))
		mlog(0, "node %u already in node map!\n", idx);
	else
		set_bit(idx, mle->node_map);

	spin_unlock(&mle->spinlock);
}


int dlm_init_mle_cache(void)
{
406
	dlm_mle_cache = kmem_cache_create("o2dlm_mle",
407 408
					  sizeof(struct dlm_master_list_entry),
					  0, SLAB_HWCACHE_ALIGN,
409
					  NULL);
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
	if (dlm_mle_cache == NULL)
		return -ENOMEM;
	return 0;
}

void dlm_destroy_mle_cache(void)
{
	if (dlm_mle_cache)
		kmem_cache_destroy(dlm_mle_cache);
}

static void dlm_mle_release(struct kref *kref)
{
	struct dlm_master_list_entry *mle;
	struct dlm_ctxt *dlm;

	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
	dlm = mle->dlm;

	assert_spin_locked(&dlm->spinlock);
	assert_spin_locked(&dlm->master_lock);

432 433
	mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
	     mle->type);
434

435
	/* remove from list if not already */
436
	__dlm_unlink_mle(dlm, mle);
437 438 439 440

	/* detach the mle from the domain node up/down events */
	__dlm_mle_detach_hb_events(dlm, mle);

441 442
	atomic_dec(&dlm->mle_cur_count[mle->type]);

443 444 445 446 447 448 449 450 451 452
	/* NOTE: kfree under spinlock here.
	 * if this is bad, we can move this to a freelist. */
	kmem_cache_free(dlm_mle_cache, mle);
}


/*
 * LOCK RESOURCE FUNCTIONS
 */

453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
int dlm_init_master_caches(void)
{
	dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
					      sizeof(struct dlm_lock_resource),
					      0, SLAB_HWCACHE_ALIGN, NULL);
	if (!dlm_lockres_cache)
		goto bail;

	dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
					       DLM_LOCKID_NAME_MAX, 0,
					       SLAB_HWCACHE_ALIGN, NULL);
	if (!dlm_lockname_cache)
		goto bail;

	return 0;
bail:
	dlm_destroy_master_caches();
	return -ENOMEM;
}

void dlm_destroy_master_caches(void)
{
475
	if (dlm_lockname_cache) {
476
		kmem_cache_destroy(dlm_lockname_cache);
477 478
		dlm_lockname_cache = NULL;
	}
479

480
	if (dlm_lockres_cache) {
481
		kmem_cache_destroy(dlm_lockres_cache);
482 483
		dlm_lockres_cache = NULL;
	}
484 485
}

486 487 488
static void dlm_lockres_release(struct kref *kref)
{
	struct dlm_lock_resource *res;
489
	struct dlm_ctxt *dlm;
490 491

	res = container_of(kref, struct dlm_lock_resource, refs);
492
	dlm = res->dlm;
493 494 495 496 497 498 499 500

	/* This should not happen -- all lockres' have a name
	 * associated with them at init time. */
	BUG_ON(!res->lockname.name);

	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
	     res->lockname.name);

501 502
	atomic_dec(&dlm->res_cur_count);

503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
	if (!hlist_unhashed(&res->hash_node) ||
	    !list_empty(&res->granted) ||
	    !list_empty(&res->converting) ||
	    !list_empty(&res->blocked) ||
	    !list_empty(&res->dirty) ||
	    !list_empty(&res->recovering) ||
	    !list_empty(&res->purge)) {
		mlog(ML_ERROR,
		     "Going to BUG for resource %.*s."
		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
		     res->lockname.len, res->lockname.name,
		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
		     !list_empty(&res->granted) ? 'G' : ' ',
		     !list_empty(&res->converting) ? 'C' : ' ',
		     !list_empty(&res->blocked) ? 'B' : ' ',
		     !list_empty(&res->dirty) ? 'D' : ' ',
		     !list_empty(&res->recovering) ? 'R' : ' ',
		     !list_empty(&res->purge) ? 'P' : ' ');

		dlm_print_one_lock_resource(res);
	}

525 526
	/* By the time we're ready to blow this guy away, we shouldn't
	 * be on any lists. */
527
	BUG_ON(!hlist_unhashed(&res->hash_node));
528 529 530 531 532 533 534
	BUG_ON(!list_empty(&res->granted));
	BUG_ON(!list_empty(&res->converting));
	BUG_ON(!list_empty(&res->blocked));
	BUG_ON(!list_empty(&res->dirty));
	BUG_ON(!list_empty(&res->recovering));
	BUG_ON(!list_empty(&res->purge));

535
	kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
536

537
	kmem_cache_free(dlm_lockres_cache, res);
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
}

void dlm_lockres_put(struct dlm_lock_resource *res)
{
	kref_put(&res->refs, dlm_lockres_release);
}

static void dlm_init_lockres(struct dlm_ctxt *dlm,
			     struct dlm_lock_resource *res,
			     const char *name, unsigned int namelen)
{
	char *qname;

	/* If we memset here, we lose our reference to the kmalloc'd
	 * res->lockname.name, so be sure to init every field
	 * correctly! */

	qname = (char *) res->lockname.name;
	memcpy(qname, name, namelen);

	res->lockname.len = namelen;
559
	res->lockname.hash = dlm_lockid_hash(name, namelen);
560 561 562

	init_waitqueue_head(&res->wq);
	spin_lock_init(&res->spinlock);
563
	INIT_HLIST_NODE(&res->hash_node);
564 565 566 567 568 569
	INIT_LIST_HEAD(&res->granted);
	INIT_LIST_HEAD(&res->converting);
	INIT_LIST_HEAD(&res->blocked);
	INIT_LIST_HEAD(&res->dirty);
	INIT_LIST_HEAD(&res->recovering);
	INIT_LIST_HEAD(&res->purge);
570
	INIT_LIST_HEAD(&res->tracking);
571 572
	atomic_set(&res->asts_reserved, 0);
	res->migration_pending = 0;
573
	res->inflight_locks = 0;
574
	res->inflight_assert_workers = 0;
575

576 577
	res->dlm = dlm;

578 579
	kref_init(&res->refs);

580 581 582
	atomic_inc(&dlm->res_tot_count);
	atomic_inc(&dlm->res_cur_count);

583 584 585 586 587 588 589 590 591
	/* just for consistency */
	spin_lock(&res->spinlock);
	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
	spin_unlock(&res->spinlock);

	res->state = DLM_LOCK_RES_IN_PROGRESS;

	res->last_used = 0;

592
	spin_lock(&dlm->spinlock);
593
	list_add_tail(&res->tracking, &dlm->tracking_list);
594
	spin_unlock(&dlm->spinlock);
595

596
	memset(res->lvb, 0, DLM_LVB_LEN);
597
	memset(res->refmap, 0, sizeof(res->refmap));
598 599 600 601 602 603
}

struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
				   const char *name,
				   unsigned int namelen)
{
604
	struct dlm_lock_resource *res = NULL;
605

606
	res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
607
	if (!res)
608
		goto error;
609

610
	res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
611 612
	if (!res->lockname.name)
		goto error;
613 614 615

	dlm_init_lockres(dlm, res, name, namelen);
	return res;
616 617 618 619 620

error:
	if (res)
		kmem_cache_free(dlm_lockres_cache, res);
	return NULL;
621 622
}

623 624
void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
				struct dlm_lock_resource *res, int bit)
625
{
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
	assert_spin_locked(&res->spinlock);

	mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
	     res->lockname.name, bit, __builtin_return_address(0));

	set_bit(bit, res->refmap);
}

void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
				  struct dlm_lock_resource *res, int bit)
{
	assert_spin_locked(&res->spinlock);

	mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
	     res->lockname.name, bit, __builtin_return_address(0));

	clear_bit(bit, res->refmap);
}

645
static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
646 647
				   struct dlm_lock_resource *res)
{
648
	res->inflight_locks++;
649

650 651 652
	mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
	     res->lockname.len, res->lockname.name, res->inflight_locks,
	     __builtin_return_address(0));
653 654
}

655 656 657 658 659 660 661
void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
				   struct dlm_lock_resource *res)
{
	assert_spin_locked(&res->spinlock);
	__dlm_lockres_grab_inflight_ref(dlm, res);
}

662 663
void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
				   struct dlm_lock_resource *res)
664 665 666 667
{
	assert_spin_locked(&res->spinlock);

	BUG_ON(res->inflight_locks == 0);
668

669
	res->inflight_locks--;
670

671 672 673 674
	mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
	     res->lockname.len, res->lockname.name, res->inflight_locks,
	     __builtin_return_address(0));

675 676 677
	wake_up(&res->wq);
}

678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706
void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
		struct dlm_lock_resource *res)
{
	assert_spin_locked(&res->spinlock);
	res->inflight_assert_workers++;
	mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
			dlm->name, res->lockname.len, res->lockname.name,
			res->inflight_assert_workers);
}

static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
		struct dlm_lock_resource *res)
{
	assert_spin_locked(&res->spinlock);
	BUG_ON(res->inflight_assert_workers == 0);
	res->inflight_assert_workers--;
	mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
			dlm->name, res->lockname.len, res->lockname.name,
			res->inflight_assert_workers);
}

static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
		struct dlm_lock_resource *res)
{
	spin_lock(&res->spinlock);
	__dlm_lockres_drop_inflight_worker(dlm, res);
	spin_unlock(&res->spinlock);
}

707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
/*
 * lookup a lock resource by name.
 * may already exist in the hashtable.
 * lockid is null terminated
 *
 * if not, allocate enough for the lockres and for
 * the temporary structure used in doing the mastering.
 *
 * also, do a lookup in the dlm->master_list to see
 * if another node has begun mastering the same lock.
 * if so, there should be a block entry in there
 * for this name, and we should *not* attempt to master
 * the lock here.   need to wait around for that node
 * to assert_master (or die).
 *
 */
struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
					  const char *lockid,
725
					  int namelen,
726 727 728 729 730 731 732 733
					  int flags)
{
	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
	struct dlm_master_list_entry *mle = NULL;
	struct dlm_master_list_entry *alloc_mle = NULL;
	int blocked = 0;
	int ret, nodenum;
	struct dlm_node_iter iter;
734
	unsigned int hash;
735
	int tries = 0;
736
	int bit, wait_on_recovery = 0;
737 738 739

	BUG_ON(!lockid);

740
	hash = dlm_lockid_hash(lockid, namelen);
741 742 743 744 745

	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);

lookup:
	spin_lock(&dlm->spinlock);
746
	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
747
	if (tmpres) {
748
		spin_unlock(&dlm->spinlock);
749
		spin_lock(&tmpres->spinlock);
750 751 752 753 754 755 756 757 758 759 760 761 762

		/*
		 * Right after dlm spinlock was released, dlm_thread could have
		 * purged the lockres. Check if lockres got unhashed. If so
		 * start over.
		 */
		if (hlist_unhashed(&tmpres->hash_node)) {
			spin_unlock(&tmpres->spinlock);
			dlm_lockres_put(tmpres);
			tmpres = NULL;
			goto lookup;
		}

763
		/* Wait on the thread that is mastering the resource */
764 765 766
		if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
			__dlm_wait_on_lockres(tmpres);
			BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
767 768 769 770
			spin_unlock(&tmpres->spinlock);
			dlm_lockres_put(tmpres);
			tmpres = NULL;
			goto lookup;
771 772
		}

773 774 775 776 777
		/* Wait on the resource purge to complete before continuing */
		if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
			BUG_ON(tmpres->owner == dlm->node_num);
			__dlm_wait_on_lockres_flags(tmpres,
						    DLM_LOCK_RES_DROPPING_REF);
778 779 780 781 782 783
			spin_unlock(&tmpres->spinlock);
			dlm_lockres_put(tmpres);
			tmpres = NULL;
			goto lookup;
		}

784 785 786 787
		/* Grab inflight ref to pin the resource */
		dlm_lockres_grab_inflight_ref(dlm, tmpres);

		spin_unlock(&tmpres->spinlock);
788 789 790 791 792 793 794 795 796 797
		if (res) {
			spin_lock(&dlm->track_lock);
			if (!list_empty(&res->tracking))
				list_del_init(&res->tracking);
			else
				mlog(ML_ERROR, "Resource %.*s not "
						"on the Tracking list\n",
						res->lockname.len,
						res->lockname.name);
			spin_unlock(&dlm->track_lock);
798
			dlm_lockres_put(res);
799
		}
800 801 802 803 804 805 806 807
		res = tmpres;
		goto leave;
	}

	if (!res) {
		spin_unlock(&dlm->spinlock);
		mlog(0, "allocating a new resource\n");
		/* nothing found and we need to allocate one. */
808
		alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824
		if (!alloc_mle)
			goto leave;
		res = dlm_new_lockres(dlm, lockid, namelen);
		if (!res)
			goto leave;
		goto lookup;
	}

	mlog(0, "no lockres found, allocated our own: %p\n", res);

	if (flags & LKM_LOCAL) {
		/* caller knows it's safe to assume it's not mastered elsewhere
		 * DONE!  return right away */
		spin_lock(&res->spinlock);
		dlm_change_lockres_owner(dlm, res, dlm->node_num);
		__dlm_insert_lockres(dlm, res);
825
		dlm_lockres_grab_inflight_ref(dlm, res);
826 827 828 829 830 831 832 833 834 835 836 837
		spin_unlock(&res->spinlock);
		spin_unlock(&dlm->spinlock);
		/* lockres still marked IN_PROGRESS */
		goto wake_waiters;
	}

	/* check master list to see if another node has started mastering it */
	spin_lock(&dlm->master_lock);

	/* if we found a block, wait for lock to be mastered by another node */
	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
	if (blocked) {
838
		int mig;
839 840 841
		if (mle->type == DLM_MLE_MASTER) {
			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
			BUG();
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858
		}
		mig = (mle->type == DLM_MLE_MIGRATION);
		/* if there is a migration in progress, let the migration
		 * finish before continuing.  we can wait for the absence
		 * of the MIGRATION mle: either the migrate finished or
		 * one of the nodes died and the mle was cleaned up.
		 * if there is a BLOCK here, but it already has a master
		 * set, we are too late.  the master does not have a ref
		 * for us in the refmap.  detach the mle and drop it.
		 * either way, go back to the top and start over. */
		if (mig || mle->master != O2NM_MAX_NODES) {
			BUG_ON(mig && mle->master == dlm->node_num);
			/* we arrived too late.  the master does not
			 * have a ref for us. retry. */
			mlog(0, "%s:%.*s: late on %s\n",
			     dlm->name, namelen, lockid,
			     mig ?  "MIGRATION" : "BLOCK");
859 860 861 862
			spin_unlock(&dlm->master_lock);
			spin_unlock(&dlm->spinlock);

			/* master is known, detach */
863 864
			if (!mig)
				dlm_mle_detach_hb_events(dlm, mle);
865 866
			dlm_put_mle(mle);
			mle = NULL;
Lucas De Marchi's avatar
Lucas De Marchi committed
867
			/* this is lame, but we can't wait on either
868 869 870 871
			 * the mle or lockres waitqueue here */
			if (mig)
				msleep(100);
			goto lookup;
872 873 874 875 876 877 878 879
		}
	} else {
		/* go ahead and try to master lock on this node */
		mle = alloc_mle;
		/* make sure this does not get freed below */
		alloc_mle = NULL;
		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
		set_bit(dlm->node_num, mle->maybe_map);
880
		__dlm_insert_mle(dlm, mle);
881 882

		/* still holding the dlm spinlock, check the recovery map
883
		 * to see if there are any nodes that still need to be
884 885 886 887
		 * considered.  these will not appear in the mle nodemap
		 * but they might own this lockres.  wait on them. */
		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
		if (bit < O2NM_MAX_NODES) {
888 889
			mlog(0, "%s: res %.*s, At least one node (%d) "
			     "to recover before lock mastery can begin\n",
890 891 892
			     dlm->name, namelen, (char *)lockid, bit);
			wait_on_recovery = 1;
		}
893 894 895 896 897 898 899 900 901
	}

	/* at this point there is either a DLM_MLE_BLOCK or a
	 * DLM_MLE_MASTER on the master list, so it's safe to add the
	 * lockres to the hashtable.  anyone who finds the lock will
	 * still have to wait on the IN_PROGRESS. */

	/* finally add the lockres to its hash bucket */
	__dlm_insert_lockres(dlm, res);
902

903 904
	/* since this lockres is new it doesn't not require the spinlock */
	__dlm_lockres_grab_inflight_ref(dlm, res);
905

906 907 908 909
	/* get an extra ref on the mle in case this is a BLOCK
	 * if so, the creator of the BLOCK may try to put the last
	 * ref at this time in the assert master handler, so we
	 * need an extra one to keep from a bad ptr deref. */
Kurt Hackel's avatar
Kurt Hackel committed
910
	dlm_get_mle_inuse(mle);
911 912 913
	spin_unlock(&dlm->master_lock);
	spin_unlock(&dlm->spinlock);

914
redo_request:
915 916 917 918 919
	while (wait_on_recovery) {
		/* any cluster changes that occurred after dropping the
		 * dlm spinlock would be detectable be a change on the mle,
		 * so we only need to clear out the recovery map once. */
		if (dlm_is_recovery_lock(lockid, namelen)) {
920 921
			mlog(0, "%s: Recovery map is not empty, but must "
			     "master $RECOVERY lock now\n", dlm->name);
922 923 924 925 926 927 928 929
			if (!dlm_pre_master_reco_lockres(dlm, res))
				wait_on_recovery = 0;
			else {
				mlog(0, "%s: waiting 500ms for heartbeat state "
				    "change\n", dlm->name);
				msleep(500);
			}
			continue;
930
		}
931 932

		dlm_kick_recovery_thread(dlm);
933
		msleep(1000);
934 935 936 937 938
		dlm_wait_for_recovery(dlm);

		spin_lock(&dlm->spinlock);
		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
		if (bit < O2NM_MAX_NODES) {
939 940
			mlog(0, "%s: res %.*s, At least one node (%d) "
			     "to recover before lock mastery can begin\n",
941 942 943 944 945
			     dlm->name, namelen, (char *)lockid, bit);
			wait_on_recovery = 1;
		} else
			wait_on_recovery = 0;
		spin_unlock(&dlm->spinlock);
946 947 948

		if (wait_on_recovery)
			dlm_wait_for_node_recovery(dlm, bit, 10000);
949 950
	}

951 952 953 954 955 956 957
	/* must wait for lock to be mastered elsewhere */
	if (blocked)
		goto wait;

	ret = -EINVAL;
	dlm_node_iter_init(mle->vote_map, &iter);
	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
958
		ret = dlm_do_master_request(res, mle, nodenum);
959 960 961 962
		if (ret < 0)
			mlog_errno(ret);
		if (mle->master != O2NM_MAX_NODES) {
			/* found a master ! */
963 964 965 966 967 968
			if (mle->master <= nodenum)
				break;
			/* if our master request has not reached the master
			 * yet, keep going until it does.  this is how the
			 * master will know that asserts are needed back to
			 * the lower nodes. */
969 970
			mlog(0, "%s: res %.*s, Requests only up to %u but "
			     "master is %u, keep going\n", dlm->name, namelen,
971
			     lockid, nodenum, mle->master);
972 973 974 975 976 977 978
		}
	}

wait:
	/* keep going until the response map includes all nodes */
	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
	if (ret < 0) {
979
		wait_on_recovery = 1;
980 981
		mlog(0, "%s: res %.*s, Node map changed, redo the master "
		     "request now, blocked=%d\n", dlm->name, res->lockname.len,
982 983
		     res->lockname.name, blocked);
		if (++tries > 20) {
984 985
			mlog(ML_ERROR, "%s: res %.*s, Spinning on "
			     "dlm_wait_for_lock_mastery, blocked = %d\n",
986
			     dlm->name, res->lockname.len,
987 988
			     res->lockname.name, blocked);
			dlm_print_one_lock_resource(res);
989
			dlm_print_one_mle(mle);
990 991 992 993 994
			tries = 0;
		}
		goto redo_request;
	}

995 996
	mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
	     res->lockname.name, res->owner);
997 998 999 1000 1001 1002 1003
	/* make sure we never continue without this */
	BUG_ON(res->owner == O2NM_MAX_NODES);

	/* master is known, detach if not already detached */
	dlm_mle_detach_hb_events(dlm, mle);
	dlm_put_mle(mle);
	/* put the extra ref */
Kurt Hackel's avatar
Kurt Hackel committed
1004
	dlm_put_mle_inuse(mle);
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039

wake_waiters:
	spin_lock(&res->spinlock);
	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
	spin_unlock(&res->spinlock);
	wake_up(&res->wq);

leave:
	/* need to free the unused mle */
	if (alloc_mle)
		kmem_cache_free(dlm_mle_cache, alloc_mle);

	return res;
}


#define DLM_MASTERY_TIMEOUT_MS   5000

static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
				     struct dlm_lock_resource *res,
				     struct dlm_master_list_entry *mle,
				     int *blocked)
{
	u8 m;
	int ret, bit;
	int map_changed, voting_done;
	int assert, sleep;

recheck:
	ret = 0;
	assert = 0;

	/* check if another node has already become the owner */
	spin_lock(&res->spinlock);
	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1040 1041
		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
		     res->lockname.len, res->lockname.name, res->owner);
1042
		spin_unlock(&res->spinlock);
1043 1044
		/* this will cause the master to re-assert across
		 * the whole cluster, freeing up mles */
1045
		if (res->owner != dlm->node_num) {
1046
			ret = dlm_do_master_request(res, mle, res->owner);
1047 1048 1049 1050 1051 1052
			if (ret < 0) {
				/* give recovery a chance to run */
				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
				msleep(500);
				goto recheck;
			}
1053 1054
		}
		ret = 0;
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
		goto leave;
	}
	spin_unlock(&res->spinlock);

	spin_lock(&mle->spinlock);
	m = mle->master;
	map_changed = (memcmp(mle->vote_map, mle->node_map,
			      sizeof(mle->vote_map)) != 0);
	voting_done = (memcmp(mle->vote_map, mle->response_map,
			     sizeof(mle->vote_map)) == 0);

	/* restart if we hit any errors */
	if (map_changed) {
		int b;
		mlog(0, "%s: %.*s: node map changed, restarting\n",
		     dlm->name, res->lockname.len, res->lockname.name);
		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
		b = (mle->type == DLM_MLE_BLOCK);
		if ((*blocked && !b) || (!*blocked && b)) {
1074
			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
			     dlm->name, res->lockname.len, res->lockname.name,
			     *blocked, b);
			*blocked = b;
		}
		spin_unlock(&mle->spinlock);
		if (ret < 0) {
			mlog_errno(ret);
			goto leave;
		}
		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
		     "rechecking now\n", dlm->name, res->lockname.len,
		     res->lockname.name);
		goto recheck;
Kurt Hackel's avatar
Kurt Hackel committed
1088 1089 1090 1091 1092 1093
	} else {
		if (!voting_done) {
			mlog(0, "map not changed and voting not done "
			     "for %s:%.*s\n", dlm->name, res->lockname.len,
			     res->lockname.name);
		}
1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
	}

	if (m != O2NM_MAX_NODES) {
		/* another node has done an assert!
		 * all done! */
		sleep = 0;
	} else {
		sleep = 1;
		/* have all nodes responded? */
		if (voting_done && !*blocked) {
			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
			if (dlm->node_num <= bit) {
				/* my node number is lowest.
			 	 * now tell other nodes that I am
				 * mastering this. */
				mle->master = dlm->node_num;
1110 1111
				/* ref was grabbed in get_lock_resource
				 * will be dropped in dlmlock_master */
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
				assert = 1;
				sleep = 0;
			}
			/* if voting is done, but we have not received
			 * an assert master yet, we must sleep */
		}
	}

	spin_unlock(&mle->spinlock);

	/* sleep if we haven't finished voting yet */
	if (sleep) {
		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);

		/*
1127
		if (kref_read(&mle->mle_refs) < 2)
1128
			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1129
			kref_read(&mle->mle_refs),
1130 1131 1132 1133 1134 1135 1136
			res->lockname.len, res->lockname.name);
		*/
		atomic_set(&mle->woken, 0);
		(void)wait_event_timeout(mle->wq,
					 (atomic_read(&mle->woken) == 1),
					 timeo);
		if (res->owner == O2NM_MAX_NODES) {
1137 1138
			mlog(0, "%s:%.*s: waiting again\n", dlm->name,
			     res->lockname.len, res->lockname.name);
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
			goto recheck;
		}
		mlog(0, "done waiting, master is %u\n", res->owner);
		ret = 0;
		goto leave;
	}

	ret = 0;   /* done */
	if (assert) {
		m = dlm->node_num;
		mlog(0, "about to master %.*s here, this=%u\n",
		     res->lockname.len, res->lockname.name, m);
1151
		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
		if (ret) {
			/* This is a failure in the network path,
			 * not in the response to the assert_master
			 * (any nonzero response is a BUG on this node).
			 * Most likely a socket just got disconnected
			 * due to node death. */
			mlog_errno(ret);
		}
		/* no longer need to restart lock mastery.
		 * all living nodes have been contacted. */
		ret = 0;
	}

	/* set the lockres owner */
	spin_lock(&res->spinlock);
1167 1168
	/* mastery reference obtained either during
	 * assert_master_handler or in get_lock_resource */
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
	dlm_change_lockres_owner(dlm, res, m);
	spin_unlock(&res->spinlock);

leave:
	return ret;
}

struct dlm_bitmap_diff_iter
{
	int curnode;
	unsigned long *orig_bm;
	unsigned long *cur_bm;
	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
};

enum dlm_node_state_change
{
	NODE_DOWN = -1,
	NODE_NO_CHANGE = 0,
	NODE_UP
};

static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
				      unsigned long *orig_bm,
				      unsigned long *cur_bm)
{
	unsigned long p1, p2;
	int i;

	iter->curnode = -1;
	iter->orig_bm = orig_bm;
	iter->cur_bm = cur_bm;

	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
       		p1 = *(iter->orig_bm + i);
	       	p2 = *(iter->cur_bm + i);
		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
	}
}

static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
				     enum dlm_node_state_change *state)
{
	int bit;

	if (iter->curnode >= O2NM_MAX_NODES)
		return -ENOENT;

	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
			    iter->curnode+1);
	if (bit >= O2NM_MAX_NODES) {
		iter->curnode = O2NM_MAX_NODES;
		return -ENOENT;
	}

	/* if it was there in the original then this node died */
	if (test_bit(bit, iter->orig_bm))
		*state = NODE_DOWN;
	else
		*state = NODE_UP;

	iter->curnode = bit;
	return bit;
}


static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
				    struct dlm_lock_resource *res,
				    struct dlm_master_list_entry *mle,
				    int blocked)
{
	struct dlm_bitmap_diff_iter bdi;
	enum dlm_node_state_change sc;
	int node;
	int ret = 0;

	mlog(0, "something happened such that the "
	     "master process may need to be restarted!\n");

	assert_spin_locked(&mle->spinlock);

	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
	while (node >= 0) {
		if (sc == NODE_UP) {
Kurt Hackel's avatar
Kurt Hackel committed
1254 1255 1256 1257
			/* a node came up.  clear any old vote from
			 * the response map and set it in the vote map
			 * then restart the mastery. */
			mlog(ML_NOTICE, "node %d up while restarting\n", node);
1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271

			/* redo the master request, but only for the new node */
			mlog(0, "sending request to new node\n");
			clear_bit(node, mle->response_map);
			set_bit(node, mle->vote_map);
		} else {
			mlog(ML_ERROR, "node down! %d\n", node);
			if (blocked) {
				int lowest = find_next_bit(mle->maybe_map,
						       O2NM_MAX_NODES, 0);

				/* act like it was never there */
				clear_bit(node, mle->maybe_map);

1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301
			       	if (node == lowest) {
					mlog(0, "expected master %u died"
					    " while this node was blocked "
					    "waiting on it!\n", node);
					lowest = find_next_bit(mle->maybe_map,
						       	O2NM_MAX_NODES,
						       	lowest+1);
					if (lowest < O2NM_MAX_NODES) {
						mlog(0, "%s:%.*s:still "
						     "blocked. waiting on %u "
						     "now\n", dlm->name,
						     res->lockname.len,
						     res->lockname.name,
						     lowest);
					} else {
						/* mle is an MLE_BLOCK, but
						 * there is now nothing left to
						 * block on.  we need to return
						 * all the way back out and try
						 * again with an MLE_MASTER.
						 * dlm_do_local_recovery_cleanup
						 * has already run, so the mle
						 * refcount is ok */
						mlog(0, "%s:%.*s: no "
						     "longer blocking. try to "
						     "master this here\n",
						     dlm->name,
						     res->lockname.len,
						     res->lockname.name);
						mle->type = DLM_MLE_MASTER;
1302
						mle->mleres = res;
1303
					}
1304 1305 1306
				}
			}

1307 1308 1309 1310 1311 1312 1313 1314 1315 1316
			/* now blank out everything, as if we had never
			 * contacted anyone */
			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
			memset(mle->response_map, 0, sizeof(mle->response_map));
			/* reset the vote_map to the current node_map */
			memcpy(mle->vote_map, mle->node_map,
			       sizeof(mle->node_map));
			/* put myself into the maybe map */
			if (mle->type != DLM_MLE_BLOCK)
				set_bit(dlm->node_num, mle->maybe_map);
1317 1318 1319 1320 1321 <