Duplicate values in PiDataPipe.

We have a gas chromatograph that processes results ~ every 10 minutes. When it has finished a cycle, I want to run an analysis. I figured that I'd set-up a PiDataPipe along with IObserver to watch the various component tags, and when new values are received, run the analysis. All good in theory!

I set up the PiDataPipe, and subscribed to the Sinusoid point. Seemed to work OK. The following is the output - it uses a timer to retrieve the results ever 60 seconds, and each "group" comprises the observations received in that period.

11:40:36:068	====================GROUP=====================
11:40:36:068	Update	6/05/2025 11:39:49 am	41.23856
11:40:36:068	Update	6/05/2025 11:40:19 am	41.45343
11:40:36:068	==============================================
11:41:36:123	====================GROUP=====================
11:41:36:123	Update	6/05/2025 11:40:49 am	41.66847
11:41:36:123	Update	6/05/2025 11:41:19 am	41.88366
11:41:36:123	==============================================
11:42:36:120	====================GROUP=====================
11:42:36:120	Update	6/05/2025 11:41:49 am	42.09901
11:42:36:120	Update	6/05/2025 11:42:19 am	42.31451
11:42:36:120	==============================================
11:43:36:160	====================GROUP=====================
11:43:36:160	Update	6/05/2025 11:42:49 am	42.53016
11:43:36:160	Update	6/05/2025 11:43:19 am	42.74594
11:43:36:160	==============================================

However, when I run it on the component tag, the data is different:

11:19:18:386	====================GROUP=====================
11:19:18:386	Update	6/05/2025 11:19:10 am	78.05022
11:19:18:386	Update	6/05/2025 11:19:15 am	78.04510
11:19:18:386	==============================================
11:24:48:680	====================GROUP=====================
11:24:48:680	Update	6/05/2025 11:24:35 am	78.04510
11:24:48:680	Update	6/05/2025 11:24:40 am	78.05621
11:24:48:680	==============================================
11:30:18:932	====================GROUP=====================
11:30:18:932	Update	6/05/2025 11:30:05 am	78.05621
11:30:18:932	Update	6/05/2025 11:30:10 am	78.08100
11:30:18:932	==============================================
11:35:49:132	====================GROUP=====================
11:35:49:132	Update	6/05/2025 11:35:40 am	78.08100
11:35:49:132	Update	6/05/2025 11:35:45 am	78.05988
11:35:49:132	==============================================

The GC is only writing the composition ~ 5.5 minutes, and you can see that when a "group" is written, the first value is the repeat of the second value from the previous timestamp, and the second value (with a timestamp 5 seconds later) is the 'new' value.

Here are the values retrieved from Pi SMT Archive Editor


image.png
You can see that the second value in each group (and its timestamp) match perfectly to the PI SMT data archive. However the first value in each group is a duplicate of the previous time, isn't in the Archive, and has a timestamp 5 seconds earlier.

Now: The main difference I can see in the configuration between the sinusoid and the component tags is that for the composition tag, "Step" is set to true. So I tried setting the sinusoid tag to 'true' and rerunning, but it is still working fine:

11:50:53:998	====================GROUP=====================
11:50:53:998	Update	6/05/2025 11:50:19 am	45.77961
11:50:53:998	Update	6/05/2025 11:50:49 am	45.99703
11:50:53:998	==============================================
11:51:54:034	====================GROUP=====================
11:51:54:034	Update	6/05/2025 11:51:19 am	46.21453
11:51:54:034	Update	6/05/2025 11:51:49 am	46.43211
11:51:54:034	==============================================
11:52:54:059	====================GROUP=====================
11:52:54:059	Update	6/05/2025 11:52:19 am	46.64975
11:52:54:059	Update	6/05/2025 11:52:49 am	46.86746
11:52:54:059	==============================================
11:53:54:099	====================GROUP=====================
11:53:54:099	Update	6/05/2025 11:53:19 am	47.08523
11:53:54:099	Update	6/05/2025 11:53:49 am	47.30305
11:53:54:099	==============================================

For what it's worth, here's the code I'm using.

internal class PiWatcher
    {
        #region Server Fields
        private PISystems PISystems { get; set; }
        public PIServer PIServer { get; set; }
        #endregion

        #region DataPipe fields
        private PIDataPipe PiPipe { get; set; }
        private PiDataPipeObserver PiPointObserver { get; set; }
        private System.Timers.Timer WatchTimer { get; set; } = new System.Timers.Timer();

        PIPoint WatchedPoint = null;
        #endregion

        public PiWatcher(string serverName)
        {
            //Connect to the server
            PISystems = new PISystems();
            PIServer = PIServers.GetPIServers()[serverName];
            PIServer.Connect();

            //Find the point to watch
            WatchedPoint = PIPoint.FindPIPoint(PIServer, "SINUSOID");

            //Create a snapshot datapipe and observer
            PiPipe = new PIDataPipe(AFDataPipeType.Snapshot);
            PiPointObserver = new PiDataPipeObserver();
            PiPipe.Subscribe(PiPointObserver);

            //Add the watched point
            PiPipe.AddSignups(new List<PIPoint> { WatchedPoint });
            
            //Configure the timer
            WatchTimer = new System.Timers.Timer();
            WatchTimer.Interval = 60000;
            WatchTimer.Elapsed += WatchTimer_Elapsed;

            //Go!
            WatchTimer.Start();

        }

        private void WatchTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            AFErrors<PIPoint> PiDataPipeObserverErrors = PiPipe.GetObserverEvents(1000, out bool hasMore);
            AFListResults<PIPoint, AFDataPipeEvent> piPointResults = PiPointObserver.Results;
            if (piPointResults.Count > 0)
            {
                Debug.WriteLine("====================GROUP=====================");
                foreach (var result in piPointResults)
                {
                    Debug.WriteLine(result.Action.ToString() + "\t" + result.Value.Timestamp + "\t" + result.Value.DisplayValue(5));
                }
                Debug.WriteLine("==============================================");
                PiPointObserver.Results.Results.Clear();

            }
        }
    }


    #region IObserver Implementation - PiDataPipeObserver
    /// <summary>
    /// https://www.youtube.com/watch?v=u1S_ecFbKFo?t=1:07:27
    /// https://docs.microsoft.com/en-us/dotnet/standard/events/how-to-implement-an-observer
    /// </summary>
    internal class PiDataPipeObserver : IObserver<AFDataPipeEvent>
    {
        private IDisposable unsubscriber;
        public AFListResults<PIPoint, AFDataPipeEvent> Results = new AFListResults<PIPoint, AFDataPipeEvent>();

        public virtual void Subscribe(IObservable<AFDataPipeEvent> provider)
        {
            unsubscriber = provider.Subscribe(this);
        }

        public virtual void Unsubscribe()
        {
            unsubscriber.Dispose();
        }

        public void OnCompleted()
        {
            //Do Nothing
        }

        public void OnError(Exception error)
        {
            //Do Nothing
        }

        public void OnNext(AFDataPipeEvent value)
        {
            Results.AddResult(value);
        }
    }

    #endregion

 

 Edit - a workaround I tried was to check each value received to see if it was different from the previous value, and discarding it if it was a duplicate. This is unsatisfactory, because two sequential identical values are possible (eg. zero!) I would prefer a more robust solution - only getting the 'new' values.

 

 

 

 

Parents
  • A few things for your consideration.

     

    In strict use of the PI terminology, a "Duplicate Value" actually means the same Timestamp AND Value. A duplicate event may mean the same timestamp but the Value properties may differ.

     

    Your constructor for the PIDataPipe uses a type of Snapshot. That means anytime new data arrives to the Snapshot subsystem that that new arriving value is also sent to your data pipe, while the old snapshot is sent to the Compression subsystem to determine if it should make to the Archive subsystem. Far from being a surprise, it should be considered totally normal that certain values your observer in your data pipe do not make it to archived or recorded values.

     

    The SINUSOID tag receives updates to the snapshot every half-minute or so BUT the default compression is kind of loose so many of those snapshots will not make it to the archives. Your GC produces new values every 10 minutes. What sounds like is happening is more a "feature" of your gas chromatograph than of anything PI related - I would surmise that while a new GC value takes 10 minutes to generate that the chromatograph seems to be updating the timestamp of the older value, producing the illusion that is have 5-minute updates. However, since PI receives an old value BUT with a new timestamp, PI is correct in pushing it to the Snapshot subsystem, and therefore to your data pipe.

Reply
  • A few things for your consideration.

     

    In strict use of the PI terminology, a "Duplicate Value" actually means the same Timestamp AND Value. A duplicate event may mean the same timestamp but the Value properties may differ.

     

    Your constructor for the PIDataPipe uses a type of Snapshot. That means anytime new data arrives to the Snapshot subsystem that that new arriving value is also sent to your data pipe, while the old snapshot is sent to the Compression subsystem to determine if it should make to the Archive subsystem. Far from being a surprise, it should be considered totally normal that certain values your observer in your data pipe do not make it to archived or recorded values.

     

    The SINUSOID tag receives updates to the snapshot every half-minute or so BUT the default compression is kind of loose so many of those snapshots will not make it to the archives. Your GC produces new values every 10 minutes. What sounds like is happening is more a "feature" of your gas chromatograph than of anything PI related - I would surmise that while a new GC value takes 10 minutes to generate that the chromatograph seems to be updating the timestamp of the older value, producing the illusion that is have 5-minute updates. However, since PI receives an old value BUT with a new timestamp, PI is correct in pushing it to the Snapshot subsystem, and therefore to your data pipe.

Children
No Data