diff options
-rw-r--r-- | fs/eventpoll.c | 158 |
1 files changed, 122 insertions, 36 deletions
diff --git a/fs/eventpoll.c b/fs/eventpoll.c index 53c3d4677dd4..4a0e98d87fcc 100644 --- a/fs/eventpoll.c +++ b/fs/eventpoll.c @@ -50,10 +50,10 @@ * * 1) epmutex (mutex) * 2) ep->mtx (mutex) - * 3) ep->wq.lock (spinlock) + * 3) ep->lock (rwlock) * * The acquire order is the one listed above, from 1 to 3. - * We need a spinlock (ep->wq.lock) because we manipulate objects + * We need a rwlock (ep->lock) because we manipulate objects * from inside the poll callback, that might be triggered from * a wake_up() that in turn might be called from IRQ context. * So we can't sleep inside the poll callback and hence we need @@ -85,7 +85,7 @@ * of epoll file descriptors, we use the current recursion depth as * the lockdep subkey. * It is possible to drop the "ep->mtx" and to use the global - * mutex "epmutex" (together with "ep->wq.lock") to have it working, + * mutex "epmutex" (together with "ep->lock") to have it working, * but having "ep->mtx" will make the interface more scalable. * Events that require holding "epmutex" are very rare, while for * normal operations the epoll private "ep->mtx" will guarantee @@ -182,8 +182,6 @@ struct epitem { * This structure is stored inside the "private_data" member of the file * structure and represents the main data structure for the eventpoll * interface. - * - * Access to it is protected by the lock inside wq. */ struct eventpoll { /* @@ -203,13 +201,16 @@ struct eventpoll { /* List of ready file descriptors */ struct list_head rdllist; + /* Lock which protects rdllist and ovflist */ + rwlock_t lock; + /* RB tree root used to store monitored fd structs */ struct rb_root_cached rbr; /* * This is a single linked list that chains all the "struct epitem" that * happened while transferring ready events to userspace w/out - * holding ->wq.lock. + * holding ->lock. */ struct epitem *ovflist; @@ -697,17 +698,17 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, * because we want the "sproc" callback to be able to do it * in a lockless way. */ - spin_lock_irq(&ep->wq.lock); + write_lock_irq(&ep->lock); list_splice_init(&ep->rdllist, &txlist); WRITE_ONCE(ep->ovflist, NULL); - spin_unlock_irq(&ep->wq.lock); + write_unlock_irq(&ep->lock); /* * Now call the callback function. */ res = (*sproc)(ep, &txlist, priv); - spin_lock_irq(&ep->wq.lock); + write_lock_irq(&ep->lock); /* * During the time we spent inside the "sproc" callback, some * other events might have been queued by the poll callback. @@ -749,11 +750,11 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, * the ->poll() wait list (delayed after we release the lock). */ if (waitqueue_active(&ep->wq)) - wake_up_locked(&ep->wq); + wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } - spin_unlock_irq(&ep->wq.lock); + write_unlock_irq(&ep->lock); if (!ep_locked) mutex_unlock(&ep->mtx); @@ -793,10 +794,10 @@ static int ep_remove(struct eventpoll *ep, struct epitem *epi) rb_erase_cached(&epi->rbn, &ep->rbr); - spin_lock_irq(&ep->wq.lock); + write_lock_irq(&ep->lock); if (ep_is_linked(epi)) list_del_init(&epi->rdllink); - spin_unlock_irq(&ep->wq.lock); + write_unlock_irq(&ep->lock); wakeup_source_unregister(ep_wakeup_source(epi)); /* @@ -846,7 +847,7 @@ static void ep_free(struct eventpoll *ep) * Walks through the whole tree by freeing each "struct epitem". At this * point we are sure no poll callbacks will be lingering around, and also by * holding "epmutex" we can be sure that no file cleanup code will hit - * us during this operation. So we can avoid the lock on "ep->wq.lock". + * us during this operation. So we can avoid the lock on "ep->lock". * We do not need to lock ep->mtx, either, we only do it to prevent * a lockdep warning. */ @@ -1027,6 +1028,7 @@ static int ep_alloc(struct eventpoll **pep) goto free_uid; mutex_init(&ep->mtx); + rwlock_init(&ep->lock); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); @@ -1116,21 +1118,107 @@ struct file *get_epoll_tfile_raw_ptr(struct file *file, int tfd, } #endif /* CONFIG_CHECKPOINT_RESTORE */ +/** + * Adds a new entry to the tail of the list in a lockless way, i.e. + * multiple CPUs are allowed to call this function concurrently. + * + * Beware: it is necessary to prevent any other modifications of the + * existing list until all changes are completed, in other words + * concurrent list_add_tail_lockless() calls should be protected + * with a read lock, where write lock acts as a barrier which + * makes sure all list_add_tail_lockless() calls are fully + * completed. + * + * Also an element can be locklessly added to the list only in one + * direction i.e. either to the tail either to the head, otherwise + * concurrent access will corrupt the list. + * + * Returns %false if element has been already added to the list, %true + * otherwise. + */ +static inline bool list_add_tail_lockless(struct list_head *new, + struct list_head *head) +{ + struct list_head *prev; + + /* + * This is simple 'new->next = head' operation, but cmpxchg() + * is used in order to detect that same element has been just + * added to the list from another CPU: the winner observes + * new->next == new. + */ + if (cmpxchg(&new->next, new, head) != new) + return false; + + /* + * Initially ->next of a new element must be updated with the head + * (we are inserting to the tail) and only then pointers are atomically + * exchanged. XCHG guarantees memory ordering, thus ->next should be + * updated before pointers are actually swapped and pointers are + * swapped before prev->next is updated. + */ + + prev = xchg(&head->prev, new); + + /* + * It is safe to modify prev->next and new->prev, because a new element + * is added only to the tail and new->next is updated before XCHG. + */ + + prev->next = new; + new->prev = prev; + + return true; +} + +/** + * Chains a new epi entry to the tail of the ep->ovflist in a lockless way, + * i.e. multiple CPUs are allowed to call this function concurrently. + * + * Returns %false if epi element has been already chained, %true otherwise. + */ +static inline bool chain_epi_lockless(struct epitem *epi) +{ + struct eventpoll *ep = epi->ep; + + /* Check that the same epi has not been just chained from another CPU */ + if (cmpxchg(&epi->next, EP_UNACTIVE_PTR, NULL) != EP_UNACTIVE_PTR) + return false; + + /* Atomically exchange tail */ + epi->next = xchg(&ep->ovflist, epi); + + return true; +} + /* * This is the callback that is passed to the wait queue wakeup * mechanism. It is called by the stored file descriptors when they * have events to report. + * + * This callback takes a read lock in order not to content with concurrent + * events from another file descriptors, thus all modifications to ->rdllist + * or ->ovflist are lockless. Read lock is paired with the write lock from + * ep_scan_ready_list(), which stops all list modifications and guarantees + * that lists state is seen correctly. + * + * Another thing worth to mention is that ep_poll_callback() can be called + * concurrently for the same @epi from different CPUs if poll table was inited + * with several wait queues entries. Plural wakeup from different CPUs of a + * single wait queue is serialized by wq.lock, but the case when multiple wait + * queues are used should be detected accordingly. This is detected using + * cmpxchg() operation. */ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key) { int pwake = 0; - unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); struct eventpoll *ep = epi->ep; __poll_t pollflags = key_to_poll(key); + unsigned long flags; int ewake = 0; - spin_lock_irqsave(&ep->wq.lock, flags); + read_lock_irqsave(&ep->lock, flags); ep_set_busy_poll_napi_id(epi); @@ -1159,17 +1247,15 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v * chained in ep->ovflist and requeued later on. */ if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) { - if (epi->next == EP_UNACTIVE_PTR) { - epi->next = READ_ONCE(ep->ovflist); - WRITE_ONCE(ep->ovflist, epi); + if (epi->next == EP_UNACTIVE_PTR && + chain_epi_lockless(epi)) ep_pm_stay_awake_rcu(epi); - } goto out_unlock; } /* If this file is already in the ready list we exit soon */ - if (!ep_is_linked(epi)) { - list_add_tail(&epi->rdllink, &ep->rdllist); + if (!ep_is_linked(epi) && + list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) { ep_pm_stay_awake_rcu(epi); } @@ -1194,13 +1280,13 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v break; } } - wake_up_locked(&ep->wq); + wake_up(&ep->wq); } if (waitqueue_active(&ep->poll_wait)) pwake++; out_unlock: - spin_unlock_irqrestore(&ep->wq.lock, flags); + read_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */ if (pwake) @@ -1485,7 +1571,7 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event, goto error_remove_epi; /* We have to drop the new item inside our item list to keep track of it */ - spin_lock_irq(&ep->wq.lock); + write_lock_irq(&ep->lock); /* record NAPI ID of new item if present */ ep_set_busy_poll_napi_id(epi); @@ -1497,12 +1583,12 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event, /* Notify waiting tasks that events are available */ if (waitqueue_active(&ep->wq)) - wake_up_locked(&ep->wq); + wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } - spin_unlock_irq(&ep->wq.lock); + write_unlock_irq(&ep->lock); atomic_long_inc(&ep->user->epoll_watches); @@ -1528,10 +1614,10 @@ error_unregister: * list, since that is used/cleaned only inside a section bound by "mtx". * And ep_insert() is called with "mtx" held. */ - spin_lock_irq(&ep->wq.lock); + write_lock_irq(&ep->lock); if (ep_is_linked(epi)) list_del_init(&epi->rdllink); - spin_unlock_irq(&ep->wq.lock); + write_unlock_irq(&ep->lock); wakeup_source_unregister(ep_wakeup_source(epi)); @@ -1575,9 +1661,9 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi, * 1) Flush epi changes above to other CPUs. This ensures * we do not miss events from ep_poll_callback if an * event occurs immediately after we call f_op->poll(). - * We need this because we did not take ep->wq.lock while + * We need this because we did not take ep->lock while * changing epi above (but ep_poll_callback does take - * ep->wq.lock). + * ep->lock). * * 2) We also need to ensure we do not miss _past_ events * when calling f_op->poll(). This barrier also @@ -1596,18 +1682,18 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi, * list, push it inside. */ if (ep_item_poll(epi, &pt, 1)) { - spin_lock_irq(&ep->wq.lock); + write_lock_irq(&ep->lock); if (!ep_is_linked(epi)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); /* Notify waiting tasks that events are available */ if (waitqueue_active(&ep->wq)) - wake_up_locked(&ep->wq); + wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } - spin_unlock_irq(&ep->wq.lock); + write_unlock_irq(&ep->lock); } /* We have to call this outside the lock */ @@ -1768,9 +1854,9 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, */ timed_out = 1; - spin_lock_irq(&ep->wq.lock); + write_lock_irq(&ep->lock); eavail = ep_events_available(ep); - spin_unlock_irq(&ep->wq.lock); + write_unlock_irq(&ep->lock); goto send_events; } |