Duplicate values in PiDataPipe.

We have a gas chromatograph that processes results ~ every 10 minutes. When it has finished a cycle, I want to run an analysis. I figured that I'd set-up a PiDataPipe along with IObserver to watch the various component tags, and when new values are received, run the analysis. All good in theory!

I set up the PiDataPipe, and subscribed to the Sinusoid point. Seemed to work OK. The following is the output - it uses a timer to retrieve the results ever 60 seconds, and each "group" comprises the observations received in that period.

11:40:36:068	====================GROUP=====================
11:40:36:068	Update	6/05/2025 11:39:49 am	41.23856
11:40:36:068	Update	6/05/2025 11:40:19 am	41.45343
11:40:36:068	==============================================
11:41:36:123	====================GROUP=====================
11:41:36:123	Update	6/05/2025 11:40:49 am	41.66847
11:41:36:123	Update	6/05/2025 11:41:19 am	41.88366
11:41:36:123	==============================================
11:42:36:120	====================GROUP=====================
11:42:36:120	Update	6/05/2025 11:41:49 am	42.09901
11:42:36:120	Update	6/05/2025 11:42:19 am	42.31451
11:42:36:120	==============================================
11:43:36:160	====================GROUP=====================
11:43:36:160	Update	6/05/2025 11:42:49 am	42.53016
11:43:36:160	Update	6/05/2025 11:43:19 am	42.74594
11:43:36:160	==============================================

However, when I run it on the component tag, the data is different:

11:19:18:386	====================GROUP=====================
11:19:18:386	Update	6/05/2025 11:19:10 am	78.05022
11:19:18:386	Update	6/05/2025 11:19:15 am	78.04510
11:19:18:386	==============================================
11:24:48:680	====================GROUP=====================
11:24:48:680	Update	6/05/2025 11:24:35 am	78.04510
11:24:48:680	Update	6/05/2025 11:24:40 am	78.05621
11:24:48:680	==============================================
11:30:18:932	====================GROUP=====================
11:30:18:932	Update	6/05/2025 11:30:05 am	78.05621
11:30:18:932	Update	6/05/2025 11:30:10 am	78.08100
11:30:18:932	==============================================
11:35:49:132	====================GROUP=====================
11:35:49:132	Update	6/05/2025 11:35:40 am	78.08100
11:35:49:132	Update	6/05/2025 11:35:45 am	78.05988
11:35:49:132	==============================================

The GC is only writing the composition ~ 5.5 minutes, and you can see that when a "group" is written, the first value is the repeat of the second value from the previous timestamp, and the second value (with a timestamp 5 seconds later) is the 'new' value.

Here are the values retrieved from Pi SMT Archive Editor


image.png
You can see that the second value in each group (and its timestamp) match perfectly to the PI SMT data archive. However the first value in each group is a duplicate of the previous time, isn't in the Archive, and has a timestamp 5 seconds earlier.

Now: The main difference I can see in the configuration between the sinusoid and the component tags is that for the composition tag, "Step" is set to true. So I tried setting the sinusoid tag to 'true' and rerunning, but it is still working fine:

11:50:53:998	====================GROUP=====================
11:50:53:998	Update	6/05/2025 11:50:19 am	45.77961
11:50:53:998	Update	6/05/2025 11:50:49 am	45.99703
11:50:53:998	==============================================
11:51:54:034	====================GROUP=====================
11:51:54:034	Update	6/05/2025 11:51:19 am	46.21453
11:51:54:034	Update	6/05/2025 11:51:49 am	46.43211
11:51:54:034	==============================================
11:52:54:059	====================GROUP=====================
11:52:54:059	Update	6/05/2025 11:52:19 am	46.64975
11:52:54:059	Update	6/05/2025 11:52:49 am	46.86746
11:52:54:059	==============================================
11:53:54:099	====================GROUP=====================
11:53:54:099	Update	6/05/2025 11:53:19 am	47.08523
11:53:54:099	Update	6/05/2025 11:53:49 am	47.30305
11:53:54:099	==============================================

For what it's worth, here's the code I'm using.

internal class PiWatcher
    {
        #region Server Fields
        private PISystems PISystems { get; set; }
        public PIServer PIServer { get; set; }
        #endregion

        #region DataPipe fields
        private PIDataPipe PiPipe { get; set; }
        private PiDataPipeObserver PiPointObserver { get; set; }
        private System.Timers.Timer WatchTimer { get; set; } = new System.Timers.Timer();

        PIPoint WatchedPoint = null;
        #endregion

        public PiWatcher(string serverName)
        {
            //Connect to the server
            PISystems = new PISystems();
            PIServer = PIServers.GetPIServers()[serverName];
            PIServer.Connect();

            //Find the point to watch
            WatchedPoint = PIPoint.FindPIPoint(PIServer, "SINUSOID");

            //Create a snapshot datapipe and observer
            PiPipe = new PIDataPipe(AFDataPipeType.Snapshot);
            PiPointObserver = new PiDataPipeObserver();
            PiPipe.Subscribe(PiPointObserver);

            //Add the watched point
            PiPipe.AddSignups(new List<PIPoint> { WatchedPoint });
            
            //Configure the timer
            WatchTimer = new System.Timers.Timer();
            WatchTimer.Interval = 60000;
            WatchTimer.Elapsed += WatchTimer_Elapsed;

            //Go!
            WatchTimer.Start();

        }

        private void WatchTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            AFErrors<PIPoint> PiDataPipeObserverErrors = PiPipe.GetObserverEvents(1000, out bool hasMore);
            AFListResults<PIPoint, AFDataPipeEvent> piPointResults = PiPointObserver.Results;
            if (piPointResults.Count > 0)
            {
                Debug.WriteLine("====================GROUP=====================");
                foreach (var result in piPointResults)
                {
                    Debug.WriteLine(result.Action.ToString() + "\t" + result.Value.Timestamp + "\t" + result.Value.DisplayValue(5));
                }
                Debug.WriteLine("==============================================");
                PiPointObserver.Results.Results.Clear();

            }
        }
    }


    #region IObserver Implementation - PiDataPipeObserver
    /// <summary>
    /// https://www.youtube.com/watch?v=u1S_ecFbKFo?t=1:07:27
    /// https://docs.microsoft.com/en-us/dotnet/standard/events/how-to-implement-an-observer
    /// </summary>
    internal class PiDataPipeObserver : IObserver<AFDataPipeEvent>
    {
        private IDisposable unsubscriber;
        public AFListResults<PIPoint, AFDataPipeEvent> Results = new AFListResults<PIPoint, AFDataPipeEvent>();

        public virtual void Subscribe(IObservable<AFDataPipeEvent> provider)
        {
            unsubscriber = provider.Subscribe(this);
        }

        public virtual void Unsubscribe()
        {
            unsubscriber.Dispose();
        }

        public void OnCompleted()
        {
            //Do Nothing
        }

        public void OnError(Exception error)
        {
            //Do Nothing
        }

        public void OnNext(AFDataPipeEvent value)
        {
            Results.AddResult(value);
        }
    }

    #endregion

 

 Edit - a workaround I tried was to check each value received to see if it was different from the previous value, and discarding it if it was a duplicate. This is unsatisfactory, because two sequential identical values are possible (eg. zero!) I would prefer a more robust solution - only getting the 'new' values.

 

 

 

 

Parents Reply Children
  • The last paragraph in your original post suggests you would prefer to only have truly new values. This is an act of futility for tags coming from other interfaces. What goes on with the interface and when and what it sends to PI is beyond your control. The data pipe can only observe what has arrived in the Snapshot subsystem, and even then you have no control over what is being sent to the Compression SS and subsequently, the Archive SS.

     

    There are 2 ways to try to force it to new values only, and both are ugly.

     

    One, you could have a service with a data pipe observing archive changes, and then (YIKES) delete what you think is duplicate data. NOT RECOMMENDED.

     

    The other way is to create a brand new tag and only update this tag in an Analysis or another AF SDK service ensuring to only write new values if it differs from the previously written value. Besides requiring a 2nd tag, even this routine is fragile enough to write duplicate values under the correct, weird circumstances. As I said, futility.