Skip to content

Commit

Permalink
Publishes notification on schema upgrade. (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
rotodd authored Sep 8, 2020
1 parent 27ab8e8 commit e979940
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using EnsureThat;
using MediatR;
using Microsoft.Health.SqlServer.Features.Schema.Messages.Get;
using Microsoft.Health.SqlServer.Features.Schema.Messages.Notifications;

namespace Microsoft.Health.SqlServer.Features.Schema.Extensions
{
Expand All @@ -31,5 +32,12 @@ public static async Task<GetCurrentVersionResponse> GetCurrentVersionAsync(this
GetCurrentVersionResponse response = await mediator.Send(request, cancellationToken);
return response;
}

public static async Task NotifySchemaUpgradedAsync(this IMediator mediator, int version)
{
EnsureArg.IsNotNull(mediator, nameof(mediator));

await mediator.Publish(new SchemaUpgradedNotification(version));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using EnsureThat;
using MediatR;

namespace Microsoft.Health.SqlServer.Features.Schema.Messages.Notifications
{
public class SchemaUpgradedNotification : INotification
{
public SchemaUpgradedNotification(int version)
{
EnsureArg.IsGte(version, 1);

Version = version;
}

public int Version { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Core;
using Microsoft.Health.SqlServer.Configs;
using Microsoft.Health.SqlServer.Features.Schema.Extensions;

namespace Microsoft.Health.SqlServer.Features.Schema
{
Expand All @@ -23,16 +25,23 @@ public class SchemaJobWorker
{
private readonly IServiceProvider _serviceProvider;
private readonly SqlServerDataStoreConfiguration _sqlServerDataStoreConfiguration;
private readonly IMediator _mediator;
private readonly ILogger _logger;

public SchemaJobWorker(IServiceProvider services, SqlServerDataStoreConfiguration sqlServerDataStoreConfiguration, ILogger<SchemaJobWorker> logger)
public SchemaJobWorker(
IServiceProvider services,
SqlServerDataStoreConfiguration sqlServerDataStoreConfiguration,
IMediator mediator,
ILogger<SchemaJobWorker> logger)
{
EnsureArg.IsNotNull(services, nameof(services));
EnsureArg.IsNotNull(sqlServerDataStoreConfiguration, nameof(sqlServerDataStoreConfiguration));
EnsureArg.IsNotNull(mediator, nameof(mediator));
EnsureArg.IsNotNull(logger, nameof(logger));

_serviceProvider = services;
_sqlServerDataStoreConfiguration = sqlServerDataStoreConfiguration;
_mediator = mediator;
_logger = logger;
}

Expand All @@ -58,8 +67,16 @@ public async Task ExecuteAsync(SchemaInformation schemaInformation, string insta
{
var schemaDataStore = scope.ServiceProvider.GetRequiredService<ISchemaDataStore>();

int? previous = schemaInformation.Current;
schemaInformation.Current = await schemaDataStore.UpsertInstanceSchemaInformationAsync(instanceName, schemaInformation, cancellationToken);

// If there was a change in the schema version
if (previous != schemaInformation.Current)
{
// Fire a notification
await _mediator.NotifySchemaUpgradedAsync((int)schemaInformation.Current);
}

await schemaDataStore.DeleteExpiredInstanceSchemaAsync(cancellationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
using System.Data;
using System.Data.SqlClient;
using EnsureThat;
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Health.SqlServer.Configs;
using Microsoft.Health.SqlServer.Features.Schema.Extensions;
using Microsoft.SqlServer.Management.Common;
using Microsoft.SqlServer.Management.Smo;

Expand All @@ -18,18 +20,26 @@ public class SchemaUpgradeRunner
private readonly IScriptProvider _scriptProvider;
private readonly IBaseScriptProvider _baseScriptProvider;
private readonly SqlServerDataStoreConfiguration _sqlServerDataStoreConfiguration;
private readonly IMediator _mediator;
private readonly ILogger<SchemaUpgradeRunner> _logger;

public SchemaUpgradeRunner(IScriptProvider scriptProvider, IBaseScriptProvider baseScriptProvider, SqlServerDataStoreConfiguration sqlServerDataStoreConfiguration, ILogger<SchemaUpgradeRunner> logger)
public SchemaUpgradeRunner(
IScriptProvider scriptProvider,
IBaseScriptProvider baseScriptProvider,
SqlServerDataStoreConfiguration sqlServerDataStoreConfiguration,
IMediator mediator,
ILogger<SchemaUpgradeRunner> logger)
{
EnsureArg.IsNotNull(scriptProvider, nameof(scriptProvider));
EnsureArg.IsNotNull(baseScriptProvider, nameof(baseScriptProvider));
EnsureArg.IsNotNull(sqlServerDataStoreConfiguration, nameof(sqlServerDataStoreConfiguration));
EnsureArg.IsNotNull(mediator, nameof(mediator));
EnsureArg.IsNotNull(logger, nameof(logger));

_scriptProvider = scriptProvider;
_baseScriptProvider = baseScriptProvider;
_sqlServerDataStoreConfiguration = sqlServerDataStoreConfiguration;
_mediator = mediator;
_logger = logger;
}

Expand All @@ -46,6 +56,7 @@ public void ApplySchema(int version, bool applyFullSchemaSnapshot)

CompleteSchemaVersion(version);

_mediator.NotifySchemaUpgradedAsync(version).Wait();
_logger.LogInformation("Completed applying schema {version}", version);
}

Expand Down

0 comments on commit e979940

Please sign in to comment.