Handshake issue when getting replayid from async call to database
kathariyasunny16 opened this issue · 8 comments
I am trying to get the replayid from the cosmos database before creating a Bayeux client object. I am not able to handshake when after bayeuxclient.Handshake(). The same code is working fine when I am not using the cosmos db call. Please find below code snippet.
var authResponse = await _authenticateConnectedApp.Authenticate().ConfigureAwait(false);
if (authResponse.AccessToken is not null)
{
var (channel, apiVersion) = Config.GetSaleForceAccountChannelDetails();
//DB call
var lastUsedReplayId = await _operatingParametersRepository.GetSaleForceLastUsedReplayId(channel).ConfigureAwait(false);
var endpointUrl = $"{authResponse.InstanceUrl}/cometd/{apiVersion}";
var readTimeOut = 120 * 1000;
var transportOptions = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
{
{ClientTransport.TIMEOUT_OPTION, readTimeOut},
{ClientTransport.MAX_NETWORK_DELAY_OPTION, readTimeOut}
};
var headers = new NameValueCollection
{{HttpRequestHeader.Authorization.ToString(), $"Bearer {authResponse.AccessToken}"}};
var transport = new LongPollingTransport(transportOptions, headers);
var transports = new ClientTransport[]
{
transport
};
// Create a CometD client instance
var bayeuxClient = new BayeuxClient(endpointUrl, transports);
bayeuxClient.Handshake();
bayeuxClient.WaitFor(100000, new List<BayeuxClient.State> {BayeuxClient.State.CONNECTED});
log.LogInformation("Handshake Status : {Status}", bayeuxClient.Handshook);
bayeuxClient.GetChannel(channel, lastUsedReplayId).Subscribe(new SalesForceMessageListener());
log.LogInformation("Connection Status : {Status}. Now, Waiting for the event", bayeuxClient.Connected);
}
// CosmosHelperRepository Code Called from above function
public async Task<long> GetSaleForceLastUsedReplayId(string channel)
{
try
{
var dbExistingRecordList = await _cosmosHelper.GetQuery(x =>
x.Pk == nameof(SalesForceReplayIdRunDetail)
&& x.ChannelName == channel);
var lastUsedReplayId = dbExistingRecordList.FirstOrDefault()?.LastUsedReplayId ?? -1;
return lastUsedReplayId;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
// Implementation of Cosmos helper GetQuery method called in above repository function
public async Task<IList<T>> GetQuery(Expression<Func<T, bool>> predicate)
{
var container = GetContainer();
var query = container.GetItemLinqQueryable<T>()
.Where(predicate);
var list = new List<T>();
using var feedIterator = _feedIteratorProvider.GetFeedIterator(query);
while (feedIterator.HasMoreResults) list.AddRange(await feedIterator.ReadNextAsync());
return list;
}
Just an FYI, I am able to get the replayId from cosmos db using the above code but the handshake is not happening.
@kdcllc please have a look. I am trying to resolve this issue for the last 2 days. Any help will be much appreciated.
Thank you so much for your response @kdcllc . I have updated the code in the issue and sharing here as well for your reference.
var authResponse = await _authenticateConnectedApp.Authenticate().ConfigureAwait(false);
if (authResponse.AccessToken is not null)
{
var (channel, apiVersion) = Config.GetSaleForceAccountChannelDetails();
//DB call
var lastUsedReplayId = await _operatingParametersRepository.GetSaleForceLastUsedReplayId(channel).ConfigureAwait(false);
var endpointUrl = $"{authResponse.InstanceUrl}/cometd/{apiVersion}";
var readTimeOut = 120 * 1000;
var transportOptions = new Dictionary<string,object>(StringComparer.OrdinalIgnoreCase)
{
{ClientTransport.TIMEOUT_OPTION, readTimeOut},
{ClientTransport.MAX_NETWORK_DELAY_OPTION, readTimeOut}
};
var headers = new NameValueCollection{
{HttpRequestHeader.Authorization.ToString(),
$"Bearer {authResponse.AccessToken}"}};
var transport = new LongPollingTransport(transportOptions, headers);
var transports = new ClientTransport[]
{
transport
};
// Create a CometD client instance
var bayeuxClient = new BayeuxClient(endpointUrl, transports);
bayeuxClient.Handshake();
bayeuxClient.WaitFor(100000, new List<BayeuxClient.State> {BayeuxClient.State.CONNECTED});
log.LogInformation("Handshake Status : {Status}", bayeuxClient.Handshook);
bayeuxClient.GetChannel(channel,lastUsedReplayId).Subscribe(new SalesForceMessageListener());
log.LogInformation("Connection Status : {Status}. Now, Waiting for the event", bayeuxClient.Connected);
}
// CosmosHelperRepository Code Called from above function
public async Task<long> GetSaleForceLastUsedReplayId(string channel)
{
try
{
var dbExistingRecordList = await _cosmosHelper.GetQuery(x=> x.Pk == nameof(SalesForceReplayIdRunDetail) && x.ChannelName == channel);
var lastUsedReplayId = dbExistingRecordList.FirstOrDefault()?.LastUsedReplayId ?? -1;
return lastUsedReplayId;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
// Implementation of Cosmos helper GetQuery method called in above
// repository function
public async Task<IList<T>> GetQuery(Expression<Func<T, bool>> predicate)
{
var container = GetContainer();
var query = container.GetItemLinqQueryable<T>()
.Where(predicate);
var list = new List<T>();
using var feedIterator = _feedIteratorProvider.GetFeedIterator(query);
while (feedIterator.HasMoreResults) list.AddRange(await feedIterator.ReadNextAsync());
return list;
}
Just an FYI, I am able to get the replayId from cosmos db using the
above code but the handshake is not happening.
Hi @kdcllc,
I debugged this and got to know that the issue is not with an async call to the cosmos repository to get the replayid.
I think the handshake is not happening when I am using the below code in Azure function startup to resolve the dependency of the cosmos repository and helper.
Code used in startup to resolve the dependency:
builder.Services.AddTransient<IOperatingParametersRepository>(s => new OperatingParametersRepository(
new CosmosHelper<SalesForceReplayIdRunDetail>(s.GetService<CosmosClient>(),
Settings.Cosmos.DatabaseName, Settings.Cosmos.OperatingParametersContainer,
s.GetService<IFeedIteratorProvider>(), s.GetService<ISerializer>(),
s.GetService<ILogger<CosmosHelper<SalesForceReplayIdRunDetail>>>())));
Cosmos Repository Constructor:
public OperatingParametersRepository(ICosmosHelper<SalesForceReplayIdRunDetail> cosmosHelper)
{
_cosmosHelper = cosmosHelper;
}
Cosmos Helper Constructor:
public CosmosHelper(CosmosClient cosmosClient, string databaseName, string containerName,
IFeedIteratorProvider feedIteratorProvider, ISerializer serializer, ILogger<CosmosHelper<T>> logger)
: base(cosmosClient, containerName, serializer, logger)
{
_cosmosClient = cosmosClient;
_databaseName = databaseName;
_containerName = containerName;
_feedIteratorProvider = feedIteratorProvider;
}
@kdcllc @martin-podlubny @r-matias @jesbacon,
Any guidance on this?
Hi @kdcllc,
I debugged this and got to know that the issue is not with an async call to the cosmos repository to get the replayid. I think the handshake is not happening when I am using the below code in Azure function startup to resolve the dependency of the cosmos repository and helper.
Code used in startup to resolve the dependency:
builder.Services.AddTransient<IOperatingParametersRepository>(s => new OperatingParametersRepository( new CosmosHelper<SalesForceReplayIdRunDetail>(s.GetService<CosmosClient>(), Settings.Cosmos.DatabaseName, Settings.Cosmos.OperatingParametersContainer, s.GetService<IFeedIteratorProvider>(), s.GetService<ISerializer>(), s.GetService<ILogger<CosmosHelper<SalesForceReplayIdRunDetail>>>())));Cosmos Repository Constructor:
public OperatingParametersRepository(ICosmosHelper<SalesForceReplayIdRunDetail> cosmosHelper) { _cosmosHelper = cosmosHelper; }Cosmos Helper Constructor:
public CosmosHelper(CosmosClient cosmosClient, string databaseName, string containerName, IFeedIteratorProvider feedIteratorProvider, ISerializer serializer, ILogger<CosmosHelper<T>> logger) : base(cosmosClient, containerName, serializer, logger) { _cosmosClient = cosmosClient; _databaseName = databaseName; _containerName = containerName; _feedIteratorProvider = feedIteratorProvider; }
@kathariyasunny16
Does this code work as console application and not as azure function?
What operating system are you using?
@kdcllc
We are using Windows 10 and haven't tried the console app as we need to use this in the Azure function.
Hello @kdcllc @martin-podlubny @r-matias ,
I have a quick question. How long the long-polling connection will be alive when I am subscribed to one channel? My subscriber is able to receive the events for 3 hours after subscribing and it automatically reconnects after 2 minutes for 3 hours. But it's not able to receive the events after 3 hours and instead of /cometd/54.0/connect request, it sends /cometd/54.0/handshake after 3 hours and then stops the subscription.
So, I just want to know whether I need to run my function which subscribes to the channel again after 3 hours. If yes, then when I try to run the function it's not able to make a successful handshake.
Please suggest some solution. Thanks in advance.
Hi @kdcllc @martin-podlubny @r-matias @jesbacon ,
Please reply to the above comment. I am stuck on this and need your support.
@kathariyasunny16 when I use a lastReplayId or a -2 I get none of the missed responses. Is it working for you?
What does your SalesForceMessageListener class look like?