How to build Machine Learning with OSI Pi and Python.

Python based machine learning (ML) libraries have evolved at an unbelievable pace. It is most impressive that the time-consuming steps such as data encoding, feature selection, model comparison and even model optimization have been fully automated. For example, the relatively new Python library PyCaret calculates the metrics of over 21 different regression models and selects the best one with just a few lines of codes. Machine learning with OSI Pi has come along way.

There are plenty of industrial applications, where these algorithms could be successfully applied. But there are two major bottlenecks for successful projects:

  1. Historical Data collection for the Model Development
    1. Real time data collection for the Model Integration

Model Development data could be downloaded in Excel or text\csv files and analyzed offline. The drawback is that this approach cannot be productized and is limited to off-line applications.

To accelerate the model development and model integration (MD\MI pipelines) for the OSIsoft PI System, TQS has developed a Python library called TQS Pandas PiFrames for OSIsoft® PI System® that connects to the PI System and provides PI data as Pandas data frames. The Pandas data frame is the preferred data structure in Python for data scientists and is supported by many ML libraries. Therefore, the TQS Pandas PiFrames for OSIsoft® PI System® can be easily integrated into ML projects in both model development and model integration.

The following shows some code examples in Python.

  1. Connecting to the PI Data Historian and PI System:


cdf = ConnectToDefaultAF()
cdf = ConnectToDefaultPI()


df = GetMultipleAttributeValuesByVariable("Bio Reactor 1",["Temperature","Concentration","Level"],'t-2h','t',60,0,None)

The resulting data frame is a time series:


The data frame can also be arranged by variable columns:

df = GetMultipleAttributeValuesByFrame("Batch_0_*","Bio Reactor 1",["Temperature","Concentration","Level"],'t-7d','t',60,0,None)

During the last couple of months, we have developed use cases around OSIsoft PI system that are based on the TQS Pandas PiFrames for OSIsoft® PI System® library:

The library has shown to significantly reduce the model development and model integration time.

SUMMARY

Machine Learning and AI projects are often slow to develop and difficult to integrate. The main reason is that most Python libraries are expecting Pandas data frames (or Numpy arrays) and these data structures are not readily available in industrial automation. TQS Integration has developed the TQS Pandas PiFrames for OSIsoft® PI System® libraries to accelerate both model development and model integration. The library is user friendly, fast and scales well for all common machine learning (ML) applications.

For information, please contact us.

Data Latency

The topic of system latency has come up a couple of times in recent projects. If you really think about it, this is not surprising. As more manufacturing gets integrated, data must be synchronized and\or orchestrated between different applications. Here are just some examples:

  1. MES: Manufacturing execution system typically connect to a variety of data sources, so the workflow developer needs to know timeout settings for different applications. Connections to the automation system will have a very low latency, but what is the expected data latency of the historian?
  1. Analysis: More and more companies move towards real-time analytics. But just how fast can you really expect calculations to be updated? This is especially true for Enterprise level systems, that are typically clones from source OSIsoft PI servers by way of PI-to-PI. So you are looking at a data flow for example:

    Source -> PI Data Archive (local) -> PI-to-PI -> PI Data Archive (region) -> PI-to-PI -> PI Data Archive (enterprise) and latency in each step.
  2. Reports: One example are product release reports. How long do you need to wait to make sure that all data have been collected?

The OSIsoft PI time series object provides a time stamp which is typically provided from the source system. This time stamp will bubble up though interfaces and data archives unchanged. This makes sense when you compare historical data, but it will mask the latency in your data.

To detect when the data point gets queued and recorded at the data server, PI offers 2 event queue that can be monitored:

AFDataPipeType.Snapshot ... to monitor the snapshot queue

AFDataPipeType.Archive ... to monitor the archive queue

You can use PowerShell scripts, which have the advantage of being a lighter application that can be combined with the existing OSIsoft PowerShell library. PowerShell is also available on most server, so you don't need a separate development environment for code changes.

The first step is to connect to the OSIsoft PI Server using the AFSDK:

function Connect-PIServer{
[OutputType('OSIsoft.AF.PI.PIServer')]
param ([string] [Parameter(Mandatory=$true, Position=0, ValueFromPipeline=$true,
ValueFromPipelineByPropertyName=$true)] $PIServerName)
$Library=$env:PIHOME+"\AF\PublicAssemblies\OSIsoft.AFSDK.dll"
Add-Type -Path $Library
$PIServer=[OSIsoft.AF.PI.PIServer]::FindPIServer($PIServerName)
$PIServer.Connect()
Write-Output($PIServer)
}

The function opens a connection to the server and returns the .NET object.

By monitoring the queues and writing the values, it will look like the following:

function Get-PointReference{
param ([PSTypeName('OSIsoft.AF.PI.PIServer')] [Parameter(Mandatory=$true,
Position=0, ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)] $PIServer,
[string] [Parameter(Mandatory=$true, Position=1, ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)]
$PIPointName)
$PIPoint=[OSIsoft.AF.PI.PIPoint]::FindPIPoint($PIServer,$PIPointName)
Write-Output($PIPoint)
}

function Get-QueueValues{
param ( [PSTypeName('OSIsoft.AF.PI.PIPoint')] [Parameter(Mandatory=$true,
Position=0, ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)] $PIPoint,
[double] [Parameter(Mandatory=$true, Position=1, ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)] $DurationInSeconds )
# get the pi point and cretae NET list
$PIPointList = New-Object System.Collections.Generic.List[OSIsoft.AF.PI.PIPoint]
$PIPointList.Add($PIPoint)
# create the pipeline
$ArchivePipeline=[OSIsoft.AF.PI.PIDataPipe]::new( [OSIsoft.AF.Data.AFDataPipeType]::Archive)
$SnapShotPipeline=[OSIsoft.AF.PI.PIDataPipe]::new( [OSIsoft.AF.Data.AFDataPipeType]::Snapshot)
# add signups
$ArchivePipeline.AddSignups($PIPointList)
$SnapShotPipeline.AddSignups($PIPointList)
# now the polling
$EndTime=(Get-Date).AddSeconds($DurationInSeconds)
While((Get-Date) -lt $EndTime){
$ArchiveEvents = $ArchivePipeline.GetUpdateEvents(1000);
$SnapShotEvents = $SnapShotPipeline.GetUpdateEvents(1000);
$RecordedTime=(Get-Date)
# format output:
foreach($ArchiveEvent in $ArchiveEvents){
$AFEvent = New-Object PSObject -Property @{
Name = $ArchiveEvent.Value.PIPoint.Name
Type = "ArchiveEvent"
Action = $ArchiveEvent.Action
TimeStamp = $ArchiveEvent.Value.Timestamp.LocalTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
QueueTime = $RecordedTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
Value = $ArchiveEvent.Value.Value.ToString()
}
$AFEvent.pstypenames.Add('My.DataQueueItem')
Write-Output($AFEvent)
}
foreach($SnapShotEvent in $SnapShotEvents){
$AFEvent = New-Object PSObject -Property @{
Name = $SnapShotEvent.Value.PIPoint.Name
Type = "SnapShotEvent"
Action = $SnapShotEvent.Action
TimeStamp = $SnapShotEvent.Value.Timestamp.LocalTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
QueueTime = $RecordedTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
Value = $SnapShotEvent.Value.Value.ToString()
}
$AFEvent.pstypenames.Add('My.DataQueueItem')
Write-Output($AFEvent)
}
# 150 ms delay
Start-Sleep -m 150
}
$ArchivePipeline.Dispose()
$SnapShotPipeline.Dispose()
}

These 2 scripts are all you need to monitor events coming into a single server. The data latency is simply the difference between the value's time stamp and the time recorded.

Measuring the data latency between 2 servers - for example a local and an enterprise server - can be done the same way. You just need 2 server objects and then monitor the snapshot (or archive) events.

unction Get-Server2ServerLatency{
param ( [PSTypeName('OSIsoft.AF.PI.PIPoint')] [Parameter(Mandatory=$true, Position=0,
ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)] $SourcePoint,
[PSTypeName('OSIsoft.AF.PI.PIPoint')] [Parameter(Mandatory=$true, Position=1,
ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)] $TargetPoint,
[double] [Parameter(Mandatory=$true, Position=2, ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)] $DurationInSeconds )
$SourceList = New-Object System.Collections.Generic.List[OSIsoft.AF.PI.PIPoint]
$SourceList.Add($SourcePoint)
$TargetList = New-Object System.Collections.Generic.List[OSIsoft.AF.PI.PIPoint]
$TargetList.Add($TargetPoint)
# create the pipeline
$SourcePipeline=[OSIsoft.AF.PI.PIDataPipe]::new( [OSIsoft.AF.Data.AFDataPipeType]::Snapshot)
$TargetPipeline=[OSIsoft.AF.PI.PIDataPipe]::new( [OSIsoft.AF.Data.AFDataPipeType]::Snapshot)
# add signups
$SourcePipeline.AddSignups($SourceList)
$TargetPipeline.AddSignups($TargetList)
# now the polling
$EndTime=(Get-Date).AddSeconds($DurationInSeconds)
While((Get-Date) -lt $EndTime){
$SourceEvents = $SourcePipeline.GetUpdateEvents(1000);
$TargetEvents = $TargetPipeline.GetUpdateEvents(1000);
$RecordedTime=(Get-Date)
# format output:
foreach($SourceEvent in $SourceEvents){
$AFEvent = New-Object PSObject -Property @{
Name = $SourceEvent.Value.PIPoint.Name
Type = "SourceEvent"
Action = $SourceEvent.Action
TimeStamp = $SourceEvent.Value.Timestamp.LocalTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
QueueTime = $RecordedTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
Value = $SourceEvent.Value.Value.ToString()
}
$AFEvent.pstypenames.Add('My.DataQueueItem')
Write-Output($AFEvent)
}
foreach($TargetEvent in $TargetEvents){
$AFEvent = New-Object PSObject -Property @{
Name = $TargetEvent.Value.PIPoint.Name
Type = "TargetEvent"
Action = $TargetEvent.Action
TimeStamp = $TargetEvent.Value.Timestamp.LocalTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
QueueTime = $RecordedTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
Value = $TargetEvent.Value.Value.ToString()
}
$AFEvent.pstypenames.Add('My.DataQueueItem')
Write-Output($AFEvent)
}
# 150 ms delay
Start-Sleep -m 150
}
$SourcePipeline.Dispose()
$TargetPipeline.Dispose()
}

Here is a quick test of a PI2PI interface reading and writing to the same server:

Get-Server2ServerLatency $srv $srv sinusoid sinusclone 30

As you can see the difference between target and source is a bit over 1 sec, which is to be expected since the scan rate is 1 second.

SUMMARY

Data latency is a key metric for every system that captures, stores, analyses, or processes data. Every sequential operation will add to the overall system latency and must be accounted for. It is not only the data transport over networks that is the major contributor, but also data queues that facilitate the packaging of data into messages that add significant delays. This topic is especially important for cloud-based systems that rely on on-premises sensor data.

As shown in this blog, data latency can and should be measured and be part of the architectural planning process. As a rule of thumb, sub second data latencies are challenging especially when the number of data sources increases.

Please contact us for more information.

Which is better? On-Premise or Cloud Based Industrial Internet of Things data flow.

Applications around the Industrial Internet of Things (IIOT) have mushroomed and each one comes with a different set of capabilities and features. So how do you compare different applications or services? And how does the new solution fit into your existing data architecture?

In general, industrial internet of things architectures fall into three categories: (1) on-premise, (2) cloud based or (3) a hybrid of the two. In the on-premises solution, data are never leaving the manufacturing network, whereas in the cloud solution all data are directly send to the cloud. In the hybrid solution, a subset of the data is replicated to the cloud and used for analysis.

Industrial Internet of Things data flow.

Today, many industrial internet of things applications fall into the hybrid category and lead to a scenario where some applications will execute on-premise and others in the cloud. To choose the right blend of on-premise and cloud functionality, let’s consider the following key metrics:

For regulated industries, there is often a requirement that the compressed timeseries is identical between two components.

For a sequential system, the calculation is as follows:

R=R1×R2×R3× ... ×Rn=ΠRj

As an example, if a system has four components with a reliability of 95% each, the overall reliability drops to 81.4%.
Making the same system redundant increases the overall system reliability to:

R=1-(1-R1)×(1-R2)×(1-R3)× ... ×(1-Rn)=1-Π(1-Ri) or 96.6% using Ri=81.4%


Highbyte is providing in flight data contextualization on the edge. This opens the door for very flexible and dynamic solutions.

Most of the protocols are equipment centric, missing relational information (one-to many and many-to-one) and time segmentation. Microsoft’s Digital Twins Definition Language (DTDL) is a relative new approach that has the potential to bridge the gap.

Summary

Industrial internet of things apps range from pure on-premises to all cloud-based solutions. On-premises architectures typically provides a higher system reliability and lower latency, while cloud-based solutions offer scalability, flexibility, and wide range of readily accessible data analytics. As a result, manufacturing IT will most likely have a blend of both, where process level analysis will run on premise and enterprise level analytic in the cloud.

Current connectors do not provide a complete manufacturing process model, industrial strength data compression, and redundancy necessary to seamlessly integrate into existing on-premises data architectures. But this is changing quickly and new approaches of in-flight contextualizer are closing the gap quickly. The goal being to better understand and utilize industrial internet of things data.

Please contact us for more information.

© All rights reserved.