LINBIT/windrbd

invalid bitmap operation - local write starts after resync but completed before it

jumpapex opened this issue · 3 comments

install WinDRBD from install-windrbd-1.0.2-signed.exe

build kernel module src from windrbd 1.1.2 and add some debug message

setup windows drbd resource with protocol C on node0, node1

  1. do "create-md" and "up" on both node
  2. do "skip initial sync" and "primary" on node 0
  3. do "down" on node 1
  4. on node0, write 4096 bytes at sector 0,
    from debug message: drbd_bm_set_bits: 0
  5. on node1, do "up"
    node 0 debug message: drbd_bm_clear_bits: 0

Everything is OK now.

to test a special case of local write when resync is in progress
e.g.
with an unstable network, a resync is in progress.
after read of resync completed, a write to same block comes,
local write request completed(not send to remote) after P_RS_WRITE_ACK received by not completed.

but it is difficult to setup.

so some test code is inserted in the drbd kernel module.

a test function test_sync_when_resync is create to simulate a local disk write(not send to remote)
it is put at:
got_BlockAck
{
...
if (p->block_id == ID_SYNCER) {
test_sync_when_resync(device, blksize, sector);
drbd_set_in_sync(peer_device, sector, blksize);
dec_rs_pending(peer_device);
atomic_sub(blksize >> 9, &connection->rs_in_flight);
return 0;
}

replace the driver on node0, and repeate previous test step 1-5

from node 0 debug message:
drbd_bm_set_bits: 0
drbd_bm_clear_bits: 0

it is not expected because drbd_set_in_sync clear the bit of the dummy write. the write will be lost.

or I miss something?

Thanks!

the inserted code:
// drbd_req.c
void test_sync_when_resync(struct drbd_device* device, unsigned int total_size, sector_t sector)
{
NTSTATUS status;
struct bio* bio;
#ifdef CONFIG_DRBD_TIMING_STATS
ktime_t start_kt;
#endif
ULONG_PTR start_jif;
struct drbd_request* req;

DbgPrintEx(DPFLTR_IHVDRIVER_ID, DPFLTR_INFO_LEVEL, "test_sync_when_resync: %" PRIu64 " - %u IN\n", sector, total_size >> 9);

status = windrbd_make_dummy_drbd_write_request(device->this_bdev, total_size, sector, &bio);
if (status != STATUS_SUCCESS) {
	return;
}

ktime_get_accounting(start_kt);
start_jif = jiffies;

// from __drbd_make_request
inc_ap_bio(device, bio_data_dir(bio));

req = drbd_request_prepare(device, bio, start_kt, start_jif);

if (IS_ERR_OR_NULL(req))
	return;

D_ASSERT(device, req->private_bio);

drbd_send_dummy_write_and_submit(device, req);

atomic_set(&req->private_bio->bi_requests_completed, 0);
req->private_bio->bi_status = BLK_STS_OK;
req->private_bio->bi_using_big_buffer = false;

bio_endio(req->private_bio);

DbgPrintEx(DPFLTR_IHVDRIVER_ID, DPFLTR_INFO_LEVEL, "test_sync_with_resync OUT\n");

}
// windrbd_device.c
static void windrbd_bio_dummy_write_finished(struct bio* bio)
{
PIRP irp = bio->bi_upper_irp;
int i;
NTSTATUS status;
int error = blk_status_to_errno(bio->bi_status);
#if 0
if (irp == NULL) {
printk("Internal error: irp is NULL in bio_finished, this should not happen.");
return;
}
#endif
// printk("1 error is %d bio is %p\n", error, bio);
// printk("bio->bi_vcnt is %d\n", bio->bi_vcnt);
status = STATUS_SUCCESS;

if (error == 0) {
	if (bio_data_dir(bio) == READ) {
		if (!bio->bi_common_data->bc_device_failed && bio->bi_upper_irp && bio->bi_upper_irp->MdlAddress) {
			char* user_buffer = bio->bi_upper_irp_buffer;
			if (user_buffer != NULL) {
				int offset;

				offset = bio->bi_mdl_offset;
				for (i = 0; i < bio->bi_vcnt; i++) {
					RtlCopyMemory(user_buffer + offset, ((char*)bio->bi_io_vec[i].bv_page->addr) + bio->bi_io_vec[i].bv_offset, bio->bi_io_vec[i].bv_len);
					offset += bio->bi_io_vec[i].bv_len;
				}
			}
			else {
				printk(KERN_WARNING "MmGetSystemAddressForMdlSafe returned NULL\n");
				status = STATUS_INVALID_PARAMETER;
			}
		}
	}
}
else {
	printk(KERN_ERR "I/O failed with %d\n", error);

	/* This translates to error 55
	 * (ERROR_DEV_NOT_EXIST: The specified network
	 * resource or device is no longer available.
	 * which is quite close to what we mean. Also
	 * under Windows 10 / Server 2016?
	 */

	status = STATUS_DEVICE_DOES_NOT_EXIST;
}
// printk("2\n");
if (bio_data_dir(bio) == READ) {
	// printk("free page contents %p ...\n", bio);
	for (i = 0; i < bio->bi_vcnt; i++) {
		// printk("free page contents bio->bi_io_vec[i].bv_page->addr is %p i is %d ...\n", bio->bi_io_vec[i].bv_page->addr, i);
		put_page(bio->bi_io_vec[i].bv_page);
		//			kfree(bio->bi_io_vec[i].bv_page->addr);
	}
}

#if 0
/* Set addr to NULL so that a free_page() later on
* (on put_page() at the end of this function) will
* not free the page again (or free an invalid address).
*/

for (i = 0; i < bio->bi_vcnt; i++) {
	bio->bi_io_vec[i].bv_page->addr = NULL;
}

#endif

KIRQL flags;

/* TODO: later when we patch out the extra copy
 * on read, this also can be done much easier.
 */

int total_num_completed = bio->bi_common_data->bc_num_requests;
size_t total_size = bio->bi_common_data->bc_total_size;

spin_lock_irqsave(&bio->bi_common_data->bc_device_failed_lock, flags);
int num_completed = atomic_inc_return(&bio->bi_common_data->bc_num_completed);
int device_failed = bio->bi_common_data->bc_device_failed;
if (status != STATUS_SUCCESS)
	bio->bi_common_data->bc_device_failed = 1;
spin_unlock_irqrestore(&bio->bi_common_data->bc_device_failed_lock, flags);

/* Do not access bio->bi_common_data here as it might be
 * already freed.
 */

#if 0
if (num_completed == total_num_completed) {
if (status == STATUS_SUCCESS)
irp->IoStatus.Information = total_size;
else
/* Windows documentation states that this
* should be set to 0 if non-success error
* code is returned (even if we already
* successfully read/wrote data).
*/
irp->IoStatus.Information = 0;

	irp->IoStatus.Status = status;

#if 0
/* do not complete? */
if (about_to_remove_irp(irp, bio->bi_bdev) != 0)
printk("XXX IRP %p not registered, let's see what happens\n", irp);

	//		spin_lock_irqsave(&bio->bi_bdev->complete_request_spinlock, flags);

#endif
// kthread_run(io_complete_thread, irp, "complete-irp");

	if (bio_data_dir(bio) == WRITE) {
		DbgPrintEx(DPFLTR_IHVDRIVER_ID, DPFLTR_INFO_LEVEL, "windrbd_bio_finished: to complete WRITE at %" PRIu64 ", size: %u\n", bio->bi_iter.bi_sector, total_size >> 9);
		/* Signal free_mdl thread that it should
		 * complete the IRP.
		 */
		bio->delayed_io_completion = true;
	}
	else
		IoCompleteRequest(irp, status != STATUS_SUCCESS ? IO_NO_INCREMENT : IO_DISK_INCREMENT);

#if 0
if (!irp_already_completed(irp))
IoCompleteRequest(irp, IO_NO_INCREMENT);

	if (really_remove_irp(irp, bio->bi_bdev) != 0)
		printk("XXX IRP %p not registered, let's see what happens\n", irp);

	// printk("out of IoCompleteRequest irp is %p\n", irp);
	//		spin_unlock_irqrestore(&bio->bi_bdev->complete_request_spinlock, flags);
	printk("XXX out of IoCompleteRequest irp is %p sector is %lld\n", irp, bio->bi_iter.bi_sector);

#endif

	kfree(bio->bi_common_data);
}
IoReleaseRemoveLock(&bio->bi_bdev->ref->w_remove_lock, NULL);

/* TODO: solves memory leak? */
/* Where is the get_page for this? */
if (bio_data_dir(bio) == WRITE) {
	for (i = 0; i < bio->bi_vcnt; i++)
		put_page(bio->bi_io_vec[i].bv_page);
}

#else
kfree(bio->bi_common_data);
#endif
bio_put(bio);
}

NTSTATUS windrbd_make_dummy_drbd_write_request(struct block_device* dev, unsigned int total_size, sector_t sector, struct bio** _bio)
{
struct _IRP* irp = NULL;
char* buffer = NULL;
unsigned long rw = WRITE;
struct bio* bio;

int b;
struct bio_collection* common_data;

// printk("1\n");
if (rw == WRITE && dev->drbd_device->resource->role[NOW] != R_PRIMARY) {
	printk("Attempt to write when not Primary\n");
	return STATUS_INVALID_PARAMETER;
}
if (sector * dev->bd_block_size >= dev->d_size) {
	dbg("Attempt to read past the end of the device: dev->bd_block_size is %d sector is %lld (%llu) byte offset is %lld (%llu) dev->d_size is %lld rw is %s\n", dev->bd_block_size, sector, sector, sector * dev->bd_block_size, sector * dev->bd_block_size, dev->d_size, rw == WRITE ? "WRITE" : "READ");
	return STATUS_INVALID_PARAMETER;
}
if (sector * dev->bd_block_size + total_size > dev->d_size) {
	dbg("Attempt to read past the end of the device, request shortened\n");
	total_size = dev->d_size - sector * dev->bd_block_size;
}
if (total_size == 0) {
	printk("I/O request of size 0.\n");
	return STATUS_INVALID_PARAMETER;
}

#if 0
if (buffer == NULL) {
printk("I/O buffer (from MmGetSystemAddressForMdlSafe()) is NULL\n");
return STATUS_INSUFFICIENT_RESOURCES;
}
#endif
int bio_count = (total_size - 1) / MAX_BIO_SIZE + 1;
int this_bio_size;
int last_bio_size = total_size % MAX_BIO_SIZE;
if (last_bio_size == 0)
last_bio_size = MAX_BIO_SIZE;

common_data = kzalloc(sizeof(*common_data), 0, 'DRBD');
if (common_data == NULL) {
	printk("Cannot allocate common data.\n");
	return STATUS_INSUFFICIENT_RESOURCES;
}
atomic_set(&common_data->bc_num_completed, 0);
common_data->bc_total_size = total_size;
common_data->bc_num_requests = bio_count;
common_data->bc_device_failed = 0;
spin_lock_init(&common_data->bc_device_failed_lock);

/* Do this before windrbd_bio_finished might be called, else
 * this could produce a blue screen.
 */

#if 0
IoMarkIrpPending(irp);
#endif

for (b = 0; b < bio_count; b++) {
	this_bio_size = (b == bio_count - 1) ? last_bio_size : MAX_BIO_SIZE;

	bio = bio_alloc(GFP_NOIO, 1, 'DBRD');
	if (bio == NULL) {
		printk("Couldn't allocate bio.\n");
		return STATUS_INSUFFICIENT_RESOURCES;
	}
	bio->bi_opf = (rw == WRITE ? REQ_OP_WRITE : REQ_OP_READ);
	bio->bi_bdev = dev;
	bio->bi_max_vecs = 1;
	bio->bi_vcnt = 0;
	bio->bi_paged_memory = (bio_data_dir(bio) == WRITE);
	//		bio->force_mdl_unlock = 1;	/* TODO: ?? */
	bio->bi_iter.bi_size = this_bio_size;
	bio->bi_iter.bi_sector = sector + b * MAX_BIO_SIZE / dev->bd_block_size;
	bio->bi_upper_irp_buffer = buffer;
	bio->bi_mdl_offset = (unsigned long long)b * MAX_BIO_SIZE;
	bio->bi_common_data = common_data;

#if 0
dbg("%s sector: %d total_size: %d\n", rw == WRITE ? "WRITE" : "READ", sector, total_size);

	bio->bi_io_vec[0].bv_page = kzalloc(sizeof(struct page), 0, 'DRBD');
	if (bio->bi_io_vec[0].bv_page == NULL) {
		printk("Couldn't allocate page.\n");
		return STATUS_INSUFFICIENT_RESOURCES; /* TODO: cleanup */
	}

	bio->bi_io_vec[0].bv_len = this_bio_size;
	bio->bi_io_vec[0].bv_page->size = this_bio_size;
	kref_init(&bio->bi_io_vec[0].bv_page->kref);

	/* Corresponding put_page in the free-mdl
	 * thread (free_bios_thread_fn())
	 */
	get_page(bio->bi_io_vec[0].bv_page);


	/*
	 * TODO: eventually we want to make READ requests work without the
	 *	 intermediate buffer and the extra copy.
	 */


	if (bio_data_dir(bio) == READ)
		bio->bi_io_vec[0].bv_page->addr = kmalloc(this_bio_size, 0, 'DRBD');
	else {
		bio->bi_io_vec[0].bv_page->addr = buffer + bio->bi_mdl_offset;
		bio->bi_io_vec[0].bv_page->is_system_buffer = 1;
	}

	/* TODO: fault inject here. */
	if (bio->bi_io_vec[0].bv_page->addr == NULL) {
		printk("Couldn't allocate temp buffer for read.\n");
		return STATUS_INSUFFICIENT_RESOURCES; /* TODO: cleanup */
	}

	bio->bi_io_vec[0].bv_offset = 0;

#endif
bio->bi_end_io = windrbd_bio_dummy_write_finished;
bio->bi_upper_irp = irp;

	// dbg("bio: %p bio->bi_io_vec[0].bv_page->addr: %p bio->bi_io_vec[0].bv_len: %d bio->bi_io_vec[0].bv_offset: %d\n", bio, bio->bi_io_vec[0].bv_page->addr, bio->bi_io_vec[0].bv_len, bio->bi_io_vec[0].bv_offset);
	dbg("bio->bi_iter.bi_size: %d bio->bi_iter.bi_sector: %d bio->bi_mdl_offset: %d\n", bio->bi_iter.bi_size, bio->bi_iter.bi_sector, bio->bi_mdl_offset);

#if 0
if (b == 0) {
// printk("into drbd_make_request irp is %p\n", irp);
if (add_irp(irp, bio->bi_bdev, bio->bi_iter.bi_sector) != 0)
printk("IRP already there?\n");
}
#endif
#if 0
if (dev->io_workqueue == NULL) {
printk("Warning: dev->io_workqueue is NULL on I/O handler.\n");
return -EINVAL; /* TODO: cleanup /
}
part_stat_add(dev, sectors[bio_data_dir(bio) == READ ? STAT_READ : STAT_WRITE], this_bio_size / 512);
/
drbd_make_request(dev->drbd_device->rq_queue, bio); /
struct io_request
ioreq;

	ioreq = kzalloc(sizeof(*ioreq), 0, 'DRBD');
	if (ioreq == NULL) {
		return -ENOMEM;	/* TODO: cleanup */
	}
	INIT_WORK(&ioreq->w, drbd_make_request_work);

	/* No need for refcount. workqueue is flushed
	 * and destroyed when becoming secondary, so
	 * no in-flight requests on drbdadm down.
	 */
	ioreq->drbd_device = dev->drbd_device;
	ioreq->bio = bio;

	// printk("2\n");
	queue_work(dev->io_workqueue, &ioreq->w);
	// printk("3\n");

#endif
*_bio = bio;
break;
}

return STATUS_SUCCESS;

}
// drbd_req.c
static void drbd_send_dummy_write_and_submit(struct drbd_device* device, struct drbd_request* req)
{
struct drbd_resource* resource = device->resource;
struct drbd_peer_device* peer_device = NULL; /* for read */
const int rw = bio_data_dir(req->master_bio);
struct bio_and_error m = { NULL, };
bool no_remote = false;
bool submit_private_bio = false;
KIRQL flags;

spin_lock_irqsave(&resource->req_lock, flags);
if (rw == WRITE) {
	/* This may temporarily give up the req_lock,
	 * but will re-acquire it before it returns here.
	 * Needs to be before the check on drbd_suspended() */
	complete_conflicting_writes(req, &flags);
	/* no more giving up req_lock from now on! */

	/* check for congestion, and potentially stop sending
	 * full data updates, but start sending "dirty bits" only. */
	maybe_pull_ahead(device);
}

if (drbd_suspended(device)) {
	/* push back and retry: */
	req->local_rq_state |= RQ_POSTPONED;
	if (req->private_bio) {
		bio_put(req->private_bio);
		req->private_bio = NULL;
		put_ldev(device);
	}
	goto out;
}

/* We fail READ early, if we can not serve it.
 * We must do this before req is registered on any lists.
 * Otherwise, drbd_req_complete() will queue failed READ for retry. */
if (rw != WRITE) {
	peer_device = find_peer_device_for_read(req);
	if (!peer_device && !req->private_bio)
		goto nodata;
}

/* which transfer log epoch does this belong to? */
req->epoch = atomic_read(&resource->current_tle_nr);

if (rw == WRITE)
	resource->dagtag_sector += req->i.size >> 9;
req->dagtag_sector = resource->dagtag_sector;
/* no point in adding empty flushes to the transfer log,
 * they are mapped to drbd barriers already. */
if (likely(req->i.size != 0)) {
	if (rw == WRITE) {
		struct drbd_request* req2;

		resource->current_tle_writes++;
		list_for_each_entry_reverse(struct drbd_request, req2, &resource->transfer_log, tl_requests) {
			if (req2->local_rq_state & RQ_WRITE) {
				/* Make the new write request depend on
				 * the previous one. */
				BUG_ON(req2->destroy_next);
				req2->destroy_next = req;
				kref_get(&req->kref);
				break;
			}
		}
	}
	list_add_tail(&req->tl_requests, &resource->transfer_log);
}

if (rw == WRITE) {
	if (req->private_bio && !may_do_writes(device)) {
		bio_put(req->private_bio);
		req->private_bio = NULL;
		put_ldev(device);
		goto nodata;
	}
	/* Need to replicate writes.  Unless it is an empty flush,
	 * which is better mapped to a DRBD P_BARRIER packet,
	 * also for drbd wire protocol compatibility reasons.
	 * If this was a flush, just start a new epoch.
	 * Unless the current epoch was empty anyways, or we are not currently
	 * replicating, in which case there is no point. */
	if (unlikely(req->i.size == 0)) {
		/* The only size==0 bios we expect are empty flushes. */
		D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
		_req_mod(req, QUEUE_AS_DRBD_BARRIER, NULL);
	}

#if 0
else if (!drbd_process_write_request(req))
no_remote = true;
wake_all_senders(resource);
#else
else {
no_remote = true;
}
#endif
}
else {
if (peer_device) {
_req_mod(req, TO_BE_SENT, peer_device);
_req_mod(req, QUEUE_FOR_NET_READ, peer_device);
wake_up(&peer_device->connection->sender_work.q_wait);
}
else
no_remote = true;
}

if (no_remote == false) {
	struct drbd_plug_cb* plug = drbd_check_plugged(resource);
	if (plug)
		drbd_update_plug(plug, req);
}

/* If it took the fast path in drbd_request_prepare, add it here.
 * The slow path has added it already. */
if (list_empty(&req->req_pending_master_completion))
	list_add_tail(&req->req_pending_master_completion,
		&device->pending_master_completion[rw == WRITE]);
if (req->private_bio) {
	/* pre_submit_jif is used in request_timer_fn() */
	req->pre_submit_jif = jiffies;
	ktime_get_accounting(req->pre_submit_kt);
	list_add_tail(&req->req_pending_local,
		&device->pending_completion[rw == WRITE]);
	_req_mod(req, TO_BE_SUBMITTED, NULL);
	/* needs to be marked within the same spinlock
	 * but we need to give up the spinlock to submit */
	submit_private_bio = true;
}
else if (no_remote) {
nodata:
	if (drbd_ratelimit()) {

#pragma warning( push )
#pragma warning (disable : 4457)
/* warning C4457: declaration of 'device' hides function parameter /
struct drbd_device
device = req->device;
drbd_err(device, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
(unsigned long long)req->i.sector, req->i.size >> 9);
#pragma warning( pop )
}
/* A write may have been queued for send_oos, however.
* So we can not simply free it, we must go through drbd_req_put_completion_ref() */
}

out:
drbd_req_put_completion_ref(req, &m, 1);
spin_unlock_irqrestore(&resource->req_lock, flags);

/* Even though above is a kref_put(), this is safe.
 * As long as we still need to submit our private bio,
 * we hold a completion ref, and the request cannot disappear.
 * If however this request did not even have a private bio to submit
 * (e.g. remote read), req may already be invalid now.
 * That's why we cannot check on req->private_bio. */

#if 0
if (submit_private_bio)
drbd_submit_req_private_bio(req);
if (m.bio)
complete_master_bio(device, &m);
#else
D_ASSERT(device, submit_private_bio);
D_ASSERT(device, m.bio == NULL);
#endif
}

Hi,

Thanks for testing this. When you tested, did you make sure that
the data is actually committed to the WinDRBD disk? There are
write caches in the layers above WinDRBD you need to make
sure that they are flushed with your tests.

Also keep in mind that when connected (and protocol is C)
writes are always made on all available diskful nodes before
returning to the application, so you won't see them in the sync
stats.

Regarding your code patches, could you clone drbd and
windrbd and push your changes on github? That is probably
easier to maintain.

Thank again for your hard work :)

Best regards,

-- Johannes

fio (Flexible I/O tester) would be a tool where one can write and flush the caches...

maybe I not descript the case clearly, please consider the following case:

  1. a resync(e.g. sector 0 with 4K size) is in progress.
  2. after local read of the resync completed, a local write(sector 0 with 4K size) to same block comes,
  3. P_RS_WRITE_ACK of the resync received by not completed(assume it is complete a little later).
  4. network break, so local write request will completed without send to remote. it will set bit 0 of bitmap
  5. now continue process P_RS_WRITE_ACK, drbd_set_in_sync will clear the bit 0 which local write just set.

so, is the change by later local write is not recorded?

the test case is difficult to setup in real world, but it may happen :)

the test code is trying to simulate the case, and I will push it latter.