summaryrefslogtreecommitdiff
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c322
1 files changed, 106 insertions, 216 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 013c7a549fb6..cba3d0278b86 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -71,7 +71,7 @@
#define DEV_NAME_LEN 32
#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
-#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
+#define RBD_READ_ONLY_DEFAULT false
/*
* block device image metadata (in-memory version)
@@ -94,7 +94,7 @@ struct rbd_image_header {
};
struct rbd_options {
- int notify_timeout;
+ bool read_only;
};
/*
@@ -174,10 +174,13 @@ struct rbd_device {
/* protects updating the header */
struct rw_semaphore header_rwsem;
+ /* name of the snapshot this device reads from */
char snap_name[RBD_MAX_SNAP_NAME_LEN];
- u32 cur_snap; /* index+1 of current snapshot within snap context
- 0 - for the head */
- int read_only;
+ /* id of the snapshot this device reads from */
+ u64 snap_id; /* current snapshot id */
+ /* whether the snap_id this device reads from still exists */
+ bool snap_exists;
+ bool read_only;
struct list_head node;
@@ -186,6 +189,7 @@ struct rbd_device {
/* sysfs related */
struct device dev;
+ unsigned long open_count;
};
static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
@@ -198,10 +202,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock);
static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
static void rbd_dev_release(struct device *dev);
-static ssize_t rbd_snap_add(struct device *dev,
- struct device_attribute *attr,
- const char *buf,
- size_t count);
static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
struct rbd_snap *snap);
@@ -247,13 +247,15 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
{
struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
- rbd_get_dev(rbd_dev);
-
- set_device_ro(bdev, rbd_dev->read_only);
-
if ((mode & FMODE_WRITE) && rbd_dev->read_only)
return -EROFS;
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+ rbd_get_dev(rbd_dev);
+ set_device_ro(bdev, rbd_dev->read_only);
+ rbd_dev->open_count++;
+ mutex_unlock(&ctl_mutex);
+
return 0;
}
@@ -261,7 +263,11 @@ static int rbd_release(struct gendisk *disk, fmode_t mode)
{
struct rbd_device *rbd_dev = disk->private_data;
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+ BUG_ON(!rbd_dev->open_count);
+ rbd_dev->open_count--;
rbd_put_dev(rbd_dev);
+ mutex_unlock(&ctl_mutex);
return 0;
}
@@ -343,17 +349,24 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
* mount options
*/
enum {
- Opt_notify_timeout,
Opt_last_int,
/* int args above */
Opt_last_string,
/* string args above */
+ Opt_read_only,
+ Opt_read_write,
+ /* Boolean args above */
+ Opt_last_bool,
};
static match_table_t rbdopt_tokens = {
- {Opt_notify_timeout, "notify_timeout=%d"},
/* int args above */
/* string args above */
+ {Opt_read_only, "read_only"},
+ {Opt_read_only, "ro"}, /* Alternate spelling */
+ {Opt_read_write, "read_write"},
+ {Opt_read_write, "rw"}, /* Alternate spelling */
+ /* Boolean args above */
{-1, NULL}
};
@@ -378,13 +391,18 @@ static int parse_rbd_opts_token(char *c, void *private)
} else if (token > Opt_last_int && token < Opt_last_string) {
dout("got string token %d val %s\n", token,
argstr[0].from);
+ } else if (token > Opt_last_string && token < Opt_last_bool) {
+ dout("got Boolean token %d\n", token);
} else {
dout("got token %d\n", token);
}
switch (token) {
- case Opt_notify_timeout:
- rbdopt->notify_timeout = intval;
+ case Opt_read_only:
+ rbdopt->read_only = true;
+ break;
+ case Opt_read_write:
+ rbdopt->read_only = false;
break;
default:
BUG_ON(token);
@@ -408,7 +426,7 @@ static struct rbd_client *rbd_get_client(const char *mon_addr,
if (!rbd_opts)
return ERR_PTR(-ENOMEM);
- rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
+ rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
opt = ceph_parse_options(options, mon_addr,
mon_addr + mon_addr_len,
@@ -450,7 +468,9 @@ static void rbd_client_release(struct kref *kref)
struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
dout("rbd_release_client %p\n", rbdc);
+ spin_lock(&rbd_client_list_lock);
list_del(&rbdc->node);
+ spin_unlock(&rbd_client_list_lock);
ceph_destroy_client(rbdc->client);
kfree(rbdc->rbd_opts);
@@ -463,9 +483,7 @@ static void rbd_client_release(struct kref *kref)
*/
static void rbd_put_client(struct rbd_device *rbd_dev)
{
- spin_lock(&rbd_client_list_lock);
kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
- spin_unlock(&rbd_client_list_lock);
rbd_dev->rbd_client = NULL;
}
@@ -498,7 +516,7 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
snap_count = le32_to_cpu(ondisk->snap_count);
header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
- snap_count * sizeof (*ondisk),
+ snap_count * sizeof(u64),
gfp_flags);
if (!header->snapc)
return -ENOMEM;
@@ -552,21 +570,6 @@ err_snapc:
return -ENOMEM;
}
-static int snap_index(struct rbd_image_header *header, int snap_num)
-{
- return header->total_snaps - snap_num;
-}
-
-static u64 cur_snap_id(struct rbd_device *rbd_dev)
-{
- struct rbd_image_header *header = &rbd_dev->header;
-
- if (!rbd_dev->cur_snap)
- return 0;
-
- return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
-}
-
static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
u64 *seq, u64 *size)
{
@@ -605,17 +608,18 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
snapc->seq = header->snap_seq;
else
snapc->seq = 0;
- dev->cur_snap = 0;
- dev->read_only = 0;
+ dev->snap_id = CEPH_NOSNAP;
+ dev->snap_exists = false;
+ dev->read_only = dev->rbd_client->rbd_opts->read_only;
if (size)
*size = header->image_size;
} else {
ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
if (ret < 0)
goto done;
-
- dev->cur_snap = header->total_snaps - ret;
- dev->read_only = 1;
+ dev->snap_id = snapc->seq;
+ dev->snap_exists = true;
+ dev->read_only = true; /* No choice for snapshots */
}
ret = 0;
@@ -626,7 +630,7 @@ done:
static void rbd_header_free(struct rbd_image_header *header)
{
- kfree(header->snapc);
+ ceph_put_snap_context(header->snapc);
kfree(header->snap_names);
kfree(header->snap_sizes);
}
@@ -904,13 +908,10 @@ static int rbd_do_request(struct request *rq,
dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
- down_read(&dev->header_rwsem);
-
osdc = &dev->rbd_client->client->osdc;
req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
false, GFP_NOIO, pages, bio);
if (!req) {
- up_read(&dev->header_rwsem);
ret = -ENOMEM;
goto done_pages;
}
@@ -937,15 +938,15 @@ static int rbd_do_request(struct request *rq,
layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
layout->fl_pg_preferred = cpu_to_le32(-1);
layout->fl_pg_pool = cpu_to_le32(dev->poolid);
- ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
- req, ops);
+ ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
+ req, ops);
+ BUG_ON(ret != 0);
ceph_osdc_build_request(req, ofs, &len,
ops,
snapc,
&mtime,
req->r_oid, req->r_oid_len);
- up_read(&dev->header_rwsem);
if (linger_req) {
ceph_osdc_set_request_linger(osdc, req);
@@ -1210,7 +1211,7 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
if (ret < 0)
return ret;
- ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
+ ops[0].watch.ver = cpu_to_le64(ver);
ops[0].watch.cookie = notify_id;
ops[0].watch.flag = 0;
@@ -1230,6 +1231,7 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
{
struct rbd_device *dev = (struct rbd_device *)data;
+ u64 hver;
int rc;
if (!dev)
@@ -1239,12 +1241,13 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
notify_id, (int)opcode);
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
rc = __rbd_update_snaps(dev);
+ hver = dev->header.obj_version;
mutex_unlock(&ctl_mutex);
if (rc)
pr_warning(RBD_DRV_NAME "%d got notification but failed to "
" update snaps: %d\n", dev->major, rc);
- rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
+ rbd_req_sync_notify_ack(dev, hver, notify_id, dev->obj_md_name);
}
/*
@@ -1321,71 +1324,7 @@ static int rbd_req_sync_unwatch(struct rbd_device *dev,
return ret;
}
-struct rbd_notify_info {
- struct rbd_device *dev;
-};
-
-static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
-{
- struct rbd_device *dev = (struct rbd_device *)data;
- if (!dev)
- return;
-
- dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
- notify_id, (int)opcode);
-}
-
-/*
- * Request sync osd notify
- */
-static int rbd_req_sync_notify(struct rbd_device *dev,
- const char *obj)
-{
- struct ceph_osd_req_op *ops;
- struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
- struct ceph_osd_event *event;
- struct rbd_notify_info info;
- int payload_len = sizeof(u32) + sizeof(u32);
- int ret;
-
- ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
- if (ret < 0)
- return ret;
-
- info.dev = dev;
-
- ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
- (void *)&info, &event);
- if (ret < 0)
- goto fail;
-
- ops[0].watch.ver = 1;
- ops[0].watch.flag = 1;
- ops[0].watch.cookie = event->cookie;
- ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
- ops[0].watch.timeout = 12;
-
- ret = rbd_req_sync_op(dev, NULL,
- CEPH_NOSNAP,
- 0,
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- ops,
- 1, obj, 0, 0, NULL, NULL, NULL);
- if (ret < 0)
- goto fail_event;
-
- ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
- dout("ceph_osdc_wait_event returned %d\n", ret);
- rbd_destroy_ops(ops);
- return 0;
-
-fail_event:
- ceph_osdc_cancel_event(event);
-fail:
- rbd_destroy_ops(ops);
- return ret;
-}
-
+#if 0
/*
* Request sync osd read
*/
@@ -1425,6 +1364,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
dout("cls_exec returned %d\n", ret);
return ret;
}
+#endif
static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
{
@@ -1457,6 +1397,7 @@ static void rbd_rq_fn(struct request_queue *q)
u64 ofs;
int num_segs, cur_seg = 0;
struct rbd_req_coll *coll;
+ struct ceph_snap_context *snapc;
/* peek at request from block layer */
if (!rq)
@@ -1483,6 +1424,20 @@ static void rbd_rq_fn(struct request_queue *q)
spin_unlock_irq(q->queue_lock);
+ down_read(&rbd_dev->header_rwsem);
+
+ if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
+ up_read(&rbd_dev->header_rwsem);
+ dout("request for non-existent snapshot");
+ spin_lock_irq(q->queue_lock);
+ __blk_end_request_all(rq, -ENXIO);
+ continue;
+ }
+
+ snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+
+ up_read(&rbd_dev->header_rwsem);
+
dout("%s 0x%x bytes at 0x%llx\n",
do_write ? "write" : "read",
size, blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1492,6 +1447,7 @@ static void rbd_rq_fn(struct request_queue *q)
if (!coll) {
spin_lock_irq(q->queue_lock);
__blk_end_request_all(rq, -ENOMEM);
+ ceph_put_snap_context(snapc);
continue;
}
@@ -1515,13 +1471,13 @@ static void rbd_rq_fn(struct request_queue *q)
/* init OSD command: write or read */
if (do_write)
rbd_req_write(rq, rbd_dev,
- rbd_dev->header.snapc,
+ snapc,
ofs,
op_size, bio,
coll, cur_seg);
else
rbd_req_read(rq, rbd_dev,
- cur_snap_id(rbd_dev),
+ rbd_dev->snap_id,
ofs,
op_size, bio,
coll, cur_seg);
@@ -1538,6 +1494,8 @@ next_seg:
if (bp)
bio_pair_release(bp);
spin_lock_irq(q->queue_lock);
+
+ ceph_put_snap_context(snapc);
}
}
@@ -1641,55 +1599,6 @@ out_dh:
return rc;
}
-/*
- * create a snapshot
- */
-static int rbd_header_add_snap(struct rbd_device *dev,
- const char *snap_name,
- gfp_t gfp_flags)
-{
- int name_len = strlen(snap_name);
- u64 new_snapid;
- int ret;
- void *data, *p, *e;
- u64 ver;
- struct ceph_mon_client *monc;
-
- /* we should create a snapshot only if we're pointing at the head */
- if (dev->cur_snap)
- return -EINVAL;
-
- monc = &dev->rbd_client->client->monc;
- ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
- dout("created snapid=%lld\n", new_snapid);
- if (ret < 0)
- return ret;
-
- data = kmalloc(name_len + 16, gfp_flags);
- if (!data)
- return -ENOMEM;
-
- p = data;
- e = data + name_len + 16;
-
- ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
- ceph_encode_64_safe(&p, e, new_snapid, bad);
-
- ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
- data, p - data, &ver);
-
- kfree(data);
-
- if (ret < 0)
- return ret;
-
- dev->header.snapc->seq = new_snapid;
-
- return 0;
-bad:
- return -ERANGE;
-}
-
static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
{
struct rbd_snap *snap;
@@ -1714,10 +1623,15 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
if (ret < 0)
return ret;
+ down_write(&rbd_dev->header_rwsem);
+
/* resized? */
- set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
+ if (rbd_dev->snap_id == CEPH_NOSNAP) {
+ sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
- down_write(&rbd_dev->header_rwsem);
+ dout("setting size to %llu sectors", (unsigned long long) size);
+ set_capacity(rbd_dev->disk, size);
+ }
snap_seq = rbd_dev->header.snapc->seq;
if (rbd_dev->header.total_snaps &&
@@ -1726,10 +1640,12 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
if head moves */
follow_seq = 1;
- kfree(rbd_dev->header.snapc);
+ ceph_put_snap_context(rbd_dev->header.snapc);
kfree(rbd_dev->header.snap_names);
kfree(rbd_dev->header.snap_sizes);
+ rbd_dev->header.obj_version = h.obj_version;
+ rbd_dev->header.image_size = h.image_size;
rbd_dev->header.total_snaps = h.total_snaps;
rbd_dev->header.snapc = h.snapc;
rbd_dev->header.snap_names = h.snap_names;
@@ -1833,8 +1749,13 @@ static ssize_t rbd_size_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+ sector_t size;
+
+ down_read(&rbd_dev->header_rwsem);
+ size = get_capacity(rbd_dev->disk);
+ up_read(&rbd_dev->header_rwsem);
- return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
+ return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
}
static ssize_t rbd_major_show(struct device *dev,
@@ -1905,7 +1826,6 @@ static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
-static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
static struct attribute *rbd_attrs[] = {
&dev_attr_size.attr,
@@ -1915,7 +1835,6 @@ static struct attribute *rbd_attrs[] = {
&dev_attr_name.attr,
&dev_attr_current_snap.attr,
&dev_attr_refresh.attr,
- &dev_attr_create_snap.attr,
NULL
};
@@ -2084,7 +2003,14 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
cur_id = rbd_dev->header.snapc->snaps[i - 1];
if (!i || old_snap->id < cur_id) {
- /* old_snap->id was skipped, thus was removed */
+ /*
+ * old_snap->id was skipped, thus was
+ * removed. If this rbd_dev is mapped to
+ * the removed snapshot, record that it no
+ * longer exists, to prevent further I/O.
+ */
+ if (rbd_dev->snap_id == old_snap->id)
+ rbd_dev->snap_exists = false;
__rbd_remove_snap_dev(rbd_dev, old_snap);
continue;
}
@@ -2232,8 +2158,8 @@ static void rbd_id_put(struct rbd_device *rbd_dev)
struct rbd_device *rbd_dev;
rbd_dev = list_entry(tmp, struct rbd_device, node);
- if (rbd_id > max_id)
- max_id = rbd_id;
+ if (rbd_dev->id > max_id)
+ max_id = rbd_dev->id;
}
spin_unlock(&rbd_dev_list_lock);
@@ -2530,6 +2456,11 @@ static ssize_t rbd_remove(struct bus_type *bus,
goto done;
}
+ if (rbd_dev->open_count) {
+ ret = -EBUSY;
+ goto done;
+ }
+
__rbd_remove_all_snaps(rbd_dev);
rbd_bus_del_dev(rbd_dev);
@@ -2538,47 +2469,6 @@ done:
return ret;
}
-static ssize_t rbd_snap_add(struct device *dev,
- struct device_attribute *attr,
- const char *buf,
- size_t count)
-{
- struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- int ret;
- char *name = kmalloc(count + 1, GFP_KERNEL);
- if (!name)
- return -ENOMEM;
-
- snprintf(name, count, "%s", buf);
-
- mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
- ret = rbd_header_add_snap(rbd_dev,
- name, GFP_KERNEL);
- if (ret < 0)
- goto err_unlock;
-
- ret = __rbd_update_snaps(rbd_dev);
- if (ret < 0)
- goto err_unlock;
-
- /* shouldn't hold ctl_mutex when notifying.. notify might
- trigger a watch callback that would need to get that mutex */
- mutex_unlock(&ctl_mutex);
-
- /* make a best effort, don't error if failed */
- rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
-
- ret = count;
- kfree(name);
- return ret;
-
-err_unlock:
- mutex_unlock(&ctl_mutex);
- kfree(name);
- return ret;
-}
-
/*
* create control files in sysfs
* /sys/bus/rbd/...