-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a cache for GatewayBackend to HaGatewayManager #501
base: main
Are you sure you want to change the base?
Conversation
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Choi Wai Yiu.
|
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Choi Wai Yiu.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am supportive of adding caching at this layer, but it's important to note that this alone won't be sufficient for the GW to continue routing traffic when the DB is unavailable, since writes happen in-the-loop of new query submission, e.g. here:
trino-gateway/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java
Line 275 in 110cd7c
queryHistoryManager.submitQueryDetail(queryDetail); |
We need a more holistic strategy around how to handle DB unavailability which may include treating some writes as best-effort and skipping them in case of DB unavailability.
cc @surajkn
private final GatewayBackendDao dao; | ||
private final AtomicReference<List<GatewayBackend>> cache = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use something like a Guava/Caffeine cache here that has more proper cache management including cache expiration times. Remember that the same database will be shared by multiple GW instances. This code seems to assume that it can keep the cache in sync by simply invalidating the cache when update operations are made, but updates may be made out-of-band by other GW instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xkrogen
Thank you for your review. @oneonestar and I are currently working on addressing the issues in this PR based on your feedback and advice :)
@xkrogen |
cc: @oneonestar |
queryDetail.getCaptureTime()); | ||
} | ||
catch (Exception e) { | ||
logger.error(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's provide some additional context in the error message.
We also need to emit some metrics based on this. We shouldn't just silently fail -- this can mask errors. When this happens, we should have metics that alerting can be set up on.
return upcast(proxyBackendList); | ||
} | ||
|
||
@Override | ||
public Optional<ProxyBackendConfiguration> getBackendByName(String name) | ||
{ | ||
List<GatewayBackend> proxyBackendList = dao.findByName(name); | ||
List<GatewayBackend> proxyBackendList = fetchAllBackends().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with this change, instead of letting the SQL backend do the filtering (by name / active status / routing group), we always fetch the full set of backends, and do the filtering in-process.
I think this is fine. We expect the number of backends to be small, no more than in the tens, so it shouldn't be a substantial perf hit. But let's call it out clearly in the PR description.
This also means we can remove all the (newly) unused methods in GatewayBackendDao
:
trino-gateway/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/GatewayBackendDao.java
Lines 26 to 48 in d9998b1
@SqlQuery(""" | |
SELECT * FROM gateway_backend | |
WHERE active = true | |
""") | |
List<GatewayBackend> findActiveBackend(); | |
@SqlQuery(""" | |
SELECT * FROM gateway_backend | |
WHERE active = true AND routing_group = 'adhoc' | |
""") | |
List<GatewayBackend> findActiveAdhocBackend(); | |
@SqlQuery(""" | |
SELECT * FROM gateway_backend | |
WHERE active = true AND routing_group = :routingGroup | |
""") | |
List<GatewayBackend> findActiveBackendByRoutingGroup(String routingGroup); | |
@SqlQuery(""" | |
SELECT * FROM gateway_backend | |
WHERE name = :name | |
""") | |
List<GatewayBackend> findByName(String name); |
{ | ||
try { | ||
List<GatewayBackend> backends = gatewayBackendSupplier.get(); | ||
cache.set(backends); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand the logic here since we are kind of layering two caches on top of each other -- the memoized supplier and the AtomicReference
. I guess you are making it such that:
- We use the memoized supplier to reduce DB load / increase performance for frequent re-lookups (within 500ms)
- We use the AtomicReference, which never expires, to provide a stale view of the data when the DB is inaccessible
This makes sense to me, but it's a bit confusing to track how these two interact. Why not just use a Guava Cache? It gives you much more configurability/flexibility so we can have all of this logic in a single place (using refreshAfterWrite()
, which does exactly what we want here) and gives us more hooks to add monitoring:
private static final Logger LOG = Logger.get(HaGatewayManager.class);
private static final Object ALL_BACKEND_CACHE_KEY = new Object();
private final GatewayBackendDao dao;
private final LoadingCache<Object, List<GatewayBackend>> backendCache;
private final CounterStat backendLookupSuccesses = new CounterStat();
private final CounterStat backendLookupFailures = new CounterStat();
public HaGatewayManager(Jdbi jdbi)
{
dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class);
backendCache = CacheBuilder
.newBuilder()
.initialCapacity(1)
.refreshAfterWrite(Duration.ofSeconds(1))
.build(CacheLoader.asyncReloading(
CacheLoader.from(this::loadAllBackends),
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())));
}
private List<GatewayBackend> loadAllBackends()
{
try {
var backends = dao.findAll();
backendLookupSuccesses.update(1);
return backends;
} catch (Exception e) {
backendLookupFailures.update(1);
LOG.warn(e, "Failed to load backends");
throw e;
}
}
// probably should be named "getAllBackends()" since it may or may not actually do any fetch
private List<GatewayBackend> fetchAllBackends()
{
return backendCache.getUnchecked(ALL_BACKEND_CACHE_KEY);
}
private final AtomicReference<List<GatewayBackend>> cache = new AtomicReference<>(); | ||
|
||
public HaGatewayManager(Jdbi jdbi) | ||
{ | ||
dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class); | ||
cache.set(ImmutableList.of()); | ||
fetchAllBackendsToCache(); | ||
gatewayBackendSupplier = Suppliers.memoizeWithExpiration(dao::findAll, 500, MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably let the cache TTL be configurable since there is a perf vs staleness tradeoff here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some testing for this as well. We want to ensure that, even if the DB goes down, we can still submit new queries.
We can add a new TestGatewayHaDataStoreFailure
where we set up a gateway, populate some backends, submit a query to populate the cache, then simulate a database failure. After this, we should try to submit new queries, and ensure everything works as expected.
Sorry for delay in my response, got swamped with some semesterly planning exercises. Business cycles and bureaucracy, ah the fun :) |
Description
Currently, Trino Gateway stops routing queries when the database is unavailable.
This PR adds a cache of
GatewayBackend
to theHaGatewayManager
, ensuring that queries can still be routed even when the database is unavailable.The cache is read-only, so update operations to
GatewayBackend
will still fail if the database is unavailable.Release notes
( x) This is not user-visible or is docs only, and no release notes are required.