1// SPDX-License-Identifier: GPL-2.0-or-later
2/* Rolling buffer helpers
3 *
4 * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8#include <linux/bitops.h>
9#include <linux/pagemap.h>
10#include <linux/rolling_buffer.h>
11#include <linux/slab.h>
12#include "internal.h"
13
14static atomic_t debug_ids;
15
16/**
17 * netfs_folioq_alloc - Allocate a folio_queue struct
18 * @rreq_id: Associated debugging ID for tracing purposes
19 * @gfp: Allocation constraints
20 * @trace: Trace tag to indicate the purpose of the allocation
21 *
22 * Allocate, initialise and account the folio_queue struct and log a trace line
23 * to mark the allocation.
24 */
25struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp,
26 unsigned int /*enum netfs_folioq_trace*/ trace)
27{
28 struct folio_queue *fq;
29
30 fq = kmalloc(sizeof(*fq), gfp);
31 if (fq) {
32 netfs_stat(&netfs_n_folioq);
33 folioq_init(folioq: fq, rreq_id);
34 fq->debug_id = atomic_inc_return(v: &debug_ids);
35 trace_netfs_folioq(fq, trace);
36 }
37 return fq;
38}
39EXPORT_SYMBOL(netfs_folioq_alloc);
40
41/**
42 * netfs_folioq_free - Free a folio_queue struct
43 * @folioq: The object to free
44 * @trace: Trace tag to indicate which free
45 *
46 * Free and unaccount the folio_queue struct.
47 */
48void netfs_folioq_free(struct folio_queue *folioq,
49 unsigned int /*enum netfs_trace_folioq*/ trace)
50{
51 trace_netfs_folioq(fq: folioq, trace);
52 netfs_stat_d(&netfs_n_folioq);
53 kfree(objp: folioq);
54}
55EXPORT_SYMBOL(netfs_folioq_free);
56
57/*
58 * Initialise a rolling buffer. We allocate an empty folio queue struct to so
59 * that the pointers can be independently driven by the producer and the
60 * consumer.
61 */
62int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id,
63 unsigned int direction)
64{
65 struct folio_queue *fq;
66
67 fq = netfs_folioq_alloc(rreq_id, GFP_NOFS, netfs_trace_folioq_rollbuf_init);
68 if (!fq)
69 return -ENOMEM;
70
71 roll->head = fq;
72 roll->tail = fq;
73 iov_iter_folio_queue(i: &roll->iter, direction, folioq: fq, first_slot: 0, offset: 0, count: 0);
74 return 0;
75}
76
77/*
78 * Add another folio_queue to a rolling buffer if there's no space left.
79 */
80int rolling_buffer_make_space(struct rolling_buffer *roll)
81{
82 struct folio_queue *fq, *head = roll->head;
83
84 if (!folioq_full(folioq: head))
85 return 0;
86
87 fq = netfs_folioq_alloc(head->rreq_id, GFP_NOFS, netfs_trace_folioq_make_space);
88 if (!fq)
89 return -ENOMEM;
90 fq->prev = head;
91
92 roll->head = fq;
93 if (folioq_full(folioq: head)) {
94 /* Make sure we don't leave the master iterator pointing to a
95 * block that might get immediately consumed.
96 */
97 if (roll->iter.folioq == head &&
98 roll->iter.folioq_slot == folioq_nr_slots(folioq: head)) {
99 roll->iter.folioq = fq;
100 roll->iter.folioq_slot = 0;
101 }
102 }
103
104 /* Make sure the initialisation is stored before the next pointer.
105 *
106 * [!] NOTE: After we set head->next, the consumer is at liberty to
107 * immediately delete the old head.
108 */
109 smp_store_release(&head->next, fq);
110 return 0;
111}
112
113/*
114 * Decant the list of folios to read into a rolling buffer.
115 */
116ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll,
117 struct readahead_control *ractl,
118 struct folio_batch *put_batch)
119{
120 struct folio_queue *fq;
121 struct page **vec;
122 int nr, ix, to;
123 ssize_t size = 0;
124
125 if (rolling_buffer_make_space(roll) < 0)
126 return -ENOMEM;
127
128 fq = roll->head;
129 vec = (struct page **)fq->vec.folios;
130 nr = __readahead_batch(rac: ractl, array: vec + folio_batch_count(fbatch: &fq->vec),
131 array_sz: folio_batch_space(fbatch: &fq->vec));
132 ix = fq->vec.nr;
133 to = ix + nr;
134 fq->vec.nr = to;
135 for (; ix < to; ix++) {
136 struct folio *folio = folioq_folio(folioq: fq, slot: ix);
137 unsigned int order = folio_order(folio);
138
139 fq->orders[ix] = order;
140 size += PAGE_SIZE << order;
141 trace_netfs_folio(folio, why: netfs_folio_trace_read);
142 if (!folio_batch_add(fbatch: put_batch, folio))
143 folio_batch_release(fbatch: put_batch);
144 }
145 WRITE_ONCE(roll->iter.count, roll->iter.count + size);
146
147 /* Store the counter after setting the slot. */
148 smp_store_release(&roll->next_head_slot, to);
149 return size;
150}
151
152/*
153 * Append a folio to the rolling buffer.
154 */
155ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio,
156 unsigned int flags)
157{
158 ssize_t size = folio_size(folio);
159 int slot;
160
161 if (rolling_buffer_make_space(roll) < 0)
162 return -ENOMEM;
163
164 slot = folioq_append(folioq: roll->head, folio);
165 if (flags & ROLLBUF_MARK_1)
166 folioq_mark(folioq: roll->head, slot);
167 if (flags & ROLLBUF_MARK_2)
168 folioq_mark2(folioq: roll->head, slot);
169
170 WRITE_ONCE(roll->iter.count, roll->iter.count + size);
171
172 /* Store the counter after setting the slot. */
173 smp_store_release(&roll->next_head_slot, slot);
174 return size;
175}
176
177/*
178 * Delete a spent buffer from a rolling queue and return the next in line. We
179 * don't return the last buffer to keep the pointers independent, but return
180 * NULL instead.
181 */
182struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll)
183{
184 struct folio_queue *spent = roll->tail, *next = READ_ONCE(spent->next);
185
186 if (!next)
187 return NULL;
188 next->prev = NULL;
189 netfs_folioq_free(spent, netfs_trace_folioq_delete);
190 roll->tail = next;
191 return next;
192}
193
194/*
195 * Clear out a rolling queue. Folios that have mark 1 set are put.
196 */
197void rolling_buffer_clear(struct rolling_buffer *roll)
198{
199 struct folio_batch fbatch;
200 struct folio_queue *p;
201
202 folio_batch_init(fbatch: &fbatch);
203
204 while ((p = roll->tail)) {
205 roll->tail = p->next;
206 for (int slot = 0; slot < folioq_count(folioq: p); slot++) {
207 struct folio *folio = folioq_folio(folioq: p, slot);
208
209 if (!folio)
210 continue;
211 if (folioq_is_marked(folioq: p, slot)) {
212 trace_netfs_folio(folio, why: netfs_folio_trace_put);
213 if (!folio_batch_add(fbatch: &fbatch, folio))
214 folio_batch_release(fbatch: &fbatch);
215 }
216 }
217
218 netfs_folioq_free(p, netfs_trace_folioq_clear);
219 }
220
221 folio_batch_release(fbatch: &fbatch);
222}
223