Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
168 views
in Technique[技术] by (71.8m points)

c# - Azure IotHubException comes as 'The connection was closed because another AMQP client opened a receive link' when trying to receive a message

I am trying to receive the Azure Device client message as below

public async Task<List<string>> RecieveMessage(string correlationId)
        {
            var response = new List<string>();
            InitializeDeviceClient("AMQP");
            var flag = true;
            while (flag)
            {
                Microsoft.Azure.Devices.Client.Message receivedMessage = await deviceClient.ReceiveAsync(TimeSpan.FromSeconds(120));

                if (receivedMessage == null)
                {
                    await Task.Delay(100).ConfigureAwait(false);
                    continue;
                }

                Trace.WriteLine(receivedMessage.CorrelationId.ToString());
                await this.deviceClient.CompleteAsync(receivedMessage);
                if (receivedMessage.CorrelationId != correlationId)
                {
                    continue;
                }
                   
                var content = Encoding.UTF8.GetString(receivedMessage.GetBytes());
                response.Add(content);
                flag = false;
            }
            return response;
        }

In ReceiveAsync method I get the exception as below intermittently when running in VSTS.

Microsoft.Azure.Devices.Client.Exceptions.IotHubException: error(condition:com.microsoft:link-creation-conflict,description:{"errorCode":409002,"trackingId":"e22773c954504357bcd262ebcf7e6c9e-G:3-TimeStamp:01/28/2021 23:24:27","message":"The connection was closed because another AMQP client opened a receive link. Only one connection is allowed per client identity. To learn more, see https://aka.ms/iothub409002","timestampUtc":"2021-01-28T23:24:27.2617772Z"},info:[com.microsoft:is-filtered:True]) ---> Microsoft.Azure.Amqp.AmqpException: {"errorCode":409002,"trackingId":"e22773c954504357bcd262ebcf7e6c9e-G:3-TimeStamp:01/28/2021 23:24:27","message":"The connection was closed because another AMQP client opened a receive link. Only one connection is allowed per client identity. To learn more, see https://aka.ms/iothub409002","timestampUtc":"2021-01-28T23:24:27.2617772Z"}

StackTrace:

at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
       at Microsoft.Azure.Amqp.AmqpObject.OpenAsyncResult.End(IAsyncResult result)
       at Microsoft.Azure.Amqp.AmqpObject.EndOpen(IAsyncResult result)
       at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
    --- End of stack trace from previous location where exception was thrown ---
       at Microsoft.Azure.Devices.Client.Transport.AmqpIoT.AmqpIoTSession.OpenReceivingAmqpLinkAsync(DeviceIdentity deviceIdentity, AmqpSession amqpSession, Nullable`1 senderSettleMode, Nullable`1 receiverSettleMode, String deviceTemplate, String moduleTemplate, String linkSuffix, String correlationId, TimeSpan timeout)
    --- End of inner exception stack trace ---
        at Microsoft.Azure.Devices.Client.Transport.AmqpIoT.AmqpIoTSession.OpenReceivingAmqpLinkAsync(DeviceIdentity deviceIdentity, AmqpSession amqpSession, Nullable`1 senderSettleMode, Nullable`1 receiverSettleMode, String deviceTemplate, String moduleTemplate, String linkSuffix, String correlationId, TimeSpan timeout)
       at Microsoft.Azure.Devices.Client.Transport.AmqpIoT.AmqpIoTSession.OpenMessageReceiverLinkAsync(DeviceIdentity deviceIdentity, TimeSpan timeout)
       at Microsoft.Azure.Devices.Client.Transport.AmqpIoT.AmqpUnit.EnsureMessageReceivingLinkAsync(TimeSpan timeout)
       at Microsoft.Azure.Devices.Client.Transport.AmqpIoT.AmqpUnit.ReceiveMessageAsync(TimeSpan timeout)
       at Microsoft.Azure.Devices.Client.Transport.Amqp.AmqpTransportHandler.ReceiveAsync(TimeoutHelper timeoutHelper)
       at Microsoft.Azure.Devices.Client.Transport.ErrorDelegatingHandler.ExecuteWithErrorHandlingAsync[T](Func`1 asyncOperation)
       at Microsoft.Azure.Devices.Client.Transport.RetryDelegatingHandler.<>c__DisplayClass18_0.<<ReceiveAsync>b__0>d.MoveNext()
    --- End of stack trace from previous location where exception was thrown ---
       at Microsoft.Azure.Devices.Client.Transport.RetryDelegatingHandler.ReceiveAsync(TimeoutHelper timeoutHelper)
       at Microsoft.Azure.Devices.Client.InternalClient.ReceiveAsync(TimeSpan timeout)
       at 
    RecieveMessage(String correlationId) in ....file.cs:line 487

What could be the reason for this?

Is is someway InitializeDeviceClient is called multiple times?

That method implementation is as below

public void InitializeDeviceClient(string connType)
        {
            bool setFlag = _fixture.UseCertificate;
            if (setFlag)
            {
                try
                {
                    var initialPath = Path.Combine(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), Constants.PathCertPath.ToString());
                    DirectoryInfo hdDirectoryInWhichToSearch = new DirectoryInfo(initialPath);
                    FileInfo[] filesInDir = hdDirectoryInWhichToSearch.GetFiles("*");
                    var cert = new X509Certificate2(File.ReadAllBytes(Path.Combine(initialPath, filesInDir[0].ToString())),
                     "123",
                     X509KeyStorageFlags.UserKeySet);
                    var auth = new DeviceAuthenticationWithX509Certificate(Path.GetFileName(filesInDir[0].ToString().Replace(".pfx", "")), cert);
                    if (deviceClient == null)
                    {
                        switch (connType.ToUpper())
                        {
                            case ConnectionTypes.Amqp:
                                deviceClient = DeviceClient.Create(_fixture.IoTHubHostName, auth, Microsoft.Azure.Devices.Client.TransportType.Amqp_Tcp_Only);
                                break;
                            case ConnectionTypes.Mqtt:
                                deviceClient = DeviceClient.Create(_fixture.IoTHubHostName, auth, Microsoft.Azure.Devices.Client.TransportType.Mqtt);
                                break;
                            case ConnectionTypes.Https:
                                deviceClient = DeviceClient.Create(_fixture.IoTHubHostName, auth, Microsoft.Azure.Devices.Client.TransportType.Http1);
                                break;
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Error in sample: {0}", ex.Message);
                }
            }
        }
question from:https://stackoverflow.com/questions/65951951/azure-iothubexception-comes-as-the-connection-was-closed-because-another-amqp-c

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

This is because you are not closing the connection made and recreating the connection. You error message says that 409002 LinkCreationConflict error has occured. enter image description here

You can Close the Connection after use OR in dispose function by using await DeviceClient.CloseAsync().ConfigureAwait(false).

Let me know if this solves your problem.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...