Last active
November 29, 2016 00:29
-
-
Save WilliamBerryiii/ccd6231d6ac1952e6b072d39fda23c19 to your computer and use it in GitHub Desktop.
FS Advent - Azure IoT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT | |
location.Latitude, | |
location.Longitude, | |
windSpeed, | |
deviceId, | |
obsTime | |
INTO | |
[powerbi-out] | |
FROM | |
[decompshred-in] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
AzureIoTHubConfig: | |
IoTHubUri: {hub name}.azure-devices.net | |
IotHubD2CEndpoint: messages/events | |
ConnectionString: HostName={hub name}.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey={my shared access key} | |
DeviceConfigs: | |
- Nickname: My Test Device | |
DeviceId: DeviceId | |
Key: {my device key} | |
Status: Enabled |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//http://madskristensen.net/post/Compress-and-decompress-strings-in-C | |
let compress (data : string) = | |
let buffer = Encoding.UTF8.GetBytes(data) | |
let ms = new MemoryStream() | |
( use zip = new GZipStream(ms, CompressionMode.Compress, true) | |
zip.Write(buffer, 0, buffer.Length) ) | |
ms.Position <- 0L | |
let compressed = Array.zeroCreate<byte> (int(ms.Length)) | |
ms.Read(compressed, 0, compressed.Length) |> ignore | |
let gzipBuffer = Array.zeroCreate<byte> (int(compressed.Length) + 4) | |
Buffer.BlockCopy(compressed, 0, gzipBuffer, 4, compressed.Length) | |
Buffer.BlockCopy(BitConverter.GetBytes(buffer.Length), 0, gzipBuffer, 0, 4) | |
Convert.ToBase64String gzipBuffer | |
let decompress (data : string) = | |
let gzipBuffer = Convert.FromBase64String(data) | |
( use memoryStream = new MemoryStream() | |
let dataLength = BitConverter.ToInt32(gzipBuffer, 0) | |
memoryStream.Write(gzipBuffer, 4, gzipBuffer.Length - 4) | |
let buffer = Array.zeroCreate<byte> (int(dataLength)) | |
memoryStream.Position <- 0L | |
( use zip = new GZipStream(memoryStream, CompressionMode.Decompress) | |
zip.Read(buffer, 0, buffer.Length) |> ignore) | |
Encoding.UTF8.GetString(buffer) | |
) | |
let dataSendTask (data : string) = | |
async { | |
let compressedData = compress data | |
let message = new Message(Encoding.UTF8.GetBytes(compressedData)) | |
deviceClient.SendEventAsync(message) | |
|> Async.AwaitTask | |
|> ignore | |
printfn "%O > Sending message %s" (DateTime.Now.ToString()) (decompress compressedData) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let dataStreamTasks = | |
Seq.initInfinite ( fun x -> | |
String.concat "|" (msftSites |> Array.mapi (fun idx site -> windSpeedMessage site idx) | |
) | |
) | |
let dataSendTask (data : string) = | |
async { | |
let message = new Message(Encoding.UTF8.GetBytes(data)) | |
deviceClient.SendEventAsync(message) | |
|> Async.AwaitTask | |
|> ignore | |
printfn "%O > Sending message %s" (DateTime.Now.ToString()) data | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dataStreamTasks | |
|> Seq.iter (fun x -> | |
dataSendTask x |> Async.RunSynchronously | |
Async.Sleep 5000 |> Async.RunSynchronously) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type telemetryDataPoint = { | |
location : GeoCoordinate | |
deviceId : string | |
windSpeed : float | |
obsTime : DateTime | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open System.Linq | |
open System.Text | |
open System.IO | |
open System.Device.Location | |
open System.Threading | |
open FSharp.Configuration | |
open Microsoft.Azure.Devices.Client | |
open Newtonsoft.Json | |
type Config = YamlConfig<FilePath="../config/config.yaml", ReadOnly=true> | |
module Main = | |
[<EntryPoint>] | |
let main argv = | |
let config = Config() | |
config.Load("../../../config/config.yaml") | |
let iotHubUri = config.AzureIoTHubConfig.IoTHubUri | |
let deviceKey = config.DeviceConfigs.First().Key | |
let deviceId = config.DeviceConfigs.First().DeviceId | |
let deviceClient = DeviceClient.Create(iotHubUri, new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, deviceKey)) | |
Console.ReadLine() |> ignore | |
0 // return an integer exit code | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Microsoft. All rights reserved. Licensed under the MIT license. See full license at the bottom of this file. | |
// Learn more about F# at http://fsharp.org | |
namespace Azure.IoTHub.Examples.FSharp.SimulatedDevice | |
open System | |
open System.Linq | |
open System.Text | |
open System.IO | |
open System.Device.Location | |
open System.Threading | |
open FSharp.Configuration | |
open Microsoft.Azure.Devices.Client | |
open Newtonsoft.Json | |
type Config = YamlConfig<FilePath="../config/config.yaml", ReadOnly=true> | |
type telemetryDataPoint = { | |
location : GeoCoordinate | |
deviceId : string | |
windSpeed : float | |
obsTime : DateTime | |
} | |
module Main = | |
[<EntryPoint>] | |
let main argv = | |
let config = Config() | |
config.Load("../../../config/config.yaml") | |
let iotHubUri = config.AzureIoTHubConfig.IoTHubUri | |
let deviceKey = config.DeviceConfigs.First().Key | |
let deviceId = config.DeviceConfigs.First().DeviceId | |
let deviceClient = DeviceClient.Create(iotHubUri, new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, deviceKey)) | |
let avgWindSpeed = 10. | |
let rand = new Random() | |
let windSpeedMessage location index = | |
let telemetryReading = { | |
deviceId = sprintf "%s%i" deviceId index | |
windSpeed = (avgWindSpeed + rand.NextDouble() * 4. - 2.) | |
location = location | |
obsTime = DateTime.UtcNow | |
} | |
let json = JsonConvert.SerializeObject(telemetryReading) | |
json | |
let getRandomGeoCoordinate seed (lat : float) (long : float) (radius : float) : GeoCoordinate = | |
// check out http://gis.stackexchange.com/a/68275 for where this calc originated. | |
let rand = new Random(seed) | |
let u = rand.NextDouble() | |
let v = rand.NextDouble() | |
let w = radius / 111000. * Math.Sqrt(u) | |
let t = 2. * Math.PI * v | |
let x = (w * Math.Cos(t)) / Math.Cos(lat) | |
let y = w * Math.Sin(t) | |
GeoCoordinate(y+lat, x+long) | |
let dataSendTask (data : string) = | |
async { | |
let message = new Message(Encoding.UTF8.GetBytes(data)) | |
deviceClient.SendEventAsync(message) | |
|> Async.AwaitTask | |
|> ignore | |
printfn "%O > Sending message %s" (DateTime.Now.ToString()) (decompress compressedData) | |
} | |
let msftSites = Array.init 10 (fun index -> getRandomGeoCoordinate index 47.643417, -122.126083 (20000.)) | |
// Start Cloud to Device Reader | |
dataReceiveTask deviceClient |> Async.Start | |
let batchDataStreamTasks = | |
Seq.initInfinite ( fun x -> | |
String.concat "|" (msftSites |> Array.mapi (fun idx site -> windSpeedMessage site idx) | |
) | |
) | |
batchDataStreamTasks | |
|> Seq.iter (fun x -> | |
dataSendTask x | |
|> Async.RunSynchronously | |
Async.Sleep 10000 |> Async.RunSynchronously) | |
Console.ReadLine() |> ignore | |
0 // return an integer exit code | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let getRandomGeoCoordinate seed (lat : float) (long : float) (radius : float) : GeoCoordinate = | |
// check out http://gis.stackexchange.com/a/68275 for where this calc originated. | |
let rand = new Random(seed) | |
let u = rand.NextDouble() | |
let v = rand.NextDouble() | |
let w = radius / 111000. * Math.Sqrt(u) | |
let t = 2. * Math.PI * v | |
let x = (w * Math.Cos(t)) / Math.Cos(lat) | |
let y = w * Math.Sin(t) | |
GeoCoordinate(y+lat, x+long) | |
let msftSites = Array.init 10 (fun index -> getRandomGeoCoordinate index 47.643417, -122.126083 (20000.)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let baseWindSpeed = 10. | |
let rand = new Random() | |
let windSpeedMessage location index = | |
let telemetryReading = { | |
deviceId = sprintf "%s%i" deviceId index | |
windSpeed = (baseWindSpeed + rand.NextDouble() * 4. - 2.) | |
location = location | |
} | |
let json = JsonConvert.SerializeObject(telemetryReading) | |
json |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let eventHubName = "postshred" | |
let connectionString = "Endpoint=sb://iot-fsharp-demo.servicebus.windows.net/;SharedAccessKeyName=default;SharedAccessKey=YFFV/x/6lc9KYi30AgRb3wbv+ZBDf/k/vh6KtmwjSXk=" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let decompress (data : string) = | |
let gzipBuffer = Convert.FromBase64String(data) | |
( use memoryStream = new MemoryStream() | |
let dataLength = BitConverter.ToInt32(gzipBuffer, 0) | |
memoryStream.Write(gzipBuffer, 4, gzipBuffer.Length - 4) | |
let buffer = Array.zeroCreate<byte> (int(dataLength)) | |
memoryStream.Position <- 0L | |
( use zip = new GZipStream(memoryStream, CompressionMode.Decompress) | |
zip.Read(buffer, 0, buffer.Length) |> ignore) | |
Encoding.UTF8.GetString(buffer) | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#r "Microsoft.ServiceBus.dll" | |
#r "System.Runtime.Serialization.dll" | |
open System | |
open System.Text | |
open System.IO | |
open System.Diagnostics | |
open System.IO.Compression | |
open Microsoft.ServiceBus.Messaging | |
open System.Runtime.Serialization |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "iot-fsharp-demo", | |
"version": "0.0.1", | |
"description": "Example decompression & shred Azure Function", | |
"copyright": "Microsoft", | |
"title": "Decompression & Shred Azure Function", | |
"authors": ["Microsoft"], | |
"language": "F#", | |
"frameworks": { | |
"net46":{ | |
"dependencies": { | |
"WindowsAzure.ServiceBus": "3.4.0" | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let Run (input: string, log: TraceWriter) = | |
let groupedData = decompress input | |
let eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#r "Microsoft.ServiceBus.dll" | |
#r "System.Runtime.Serialization.dll" | |
open System | |
open System.Text | |
open System.IO | |
open System.Diagnostics | |
open System.IO.Compression | |
open Microsoft.ServiceBus.Messaging | |
open System.Runtime.Serialization | |
let eventHubName = "postshred" | |
let connectionString = "Endpoint=sb://iot-fsharp-demo.servicebus.windows.net/;SharedAccessKeyName=default;SharedAccessKey=YFFV/x/6lc9KYi30AgRb3wbv+ZBDf/k/vh6KtmwjSXk=" | |
let decompress (data : string) = | |
let gzipBuffer = Convert.FromBase64String(data) | |
( use memoryStream = new MemoryStream() | |
let dataLength = BitConverter.ToInt32(gzipBuffer, 0) | |
memoryStream.Write(gzipBuffer, 4, gzipBuffer.Length - 4) | |
let buffer = Array.zeroCreate<byte> (int(dataLength)) | |
memoryStream.Position <- 0L | |
( use zip = new GZipStream(memoryStream, CompressionMode.Decompress) | |
zip.Read(buffer, 0, buffer.Length) |> ignore) | |
Encoding.UTF8.GetString(buffer) | |
) | |
let Run (input: string, log: TraceWriter) = | |
let groupedData = decompress input | |
let eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName) | |
groupedData.Split([|"|"|], StringSplitOptions.RemoveEmptyEntries) | |
|> Array.iter (fun data -> | |
eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(data))) | |
log.Info(sprintf "F# Queue trigger function processed: '%s'" data) | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"location":{"Latitude":40.572785337127179,"Longitude":73.94234667988286,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId0","windSpeed":10.904973079871839,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.77039546021787,"Longitude":73.936369994380513,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId1","windSpeed":8.9946743366283712,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.802419980175664,"Longitude":74.137364620009123,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId2","windSpeed":11.084375593384902,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.620429899604716,"Longitude":74.0377327342062,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId3","windSpeed":9.174076850141434,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.70360108888142,"Longitude":73.8420657430119,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId4","windSpeed":11.263778106897966,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.815168724564174,"Longitude":74.028574918004367,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId5","windSpeed":9.3534793636544968,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.634263716975241,"Longitude":74.154688016340259,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId6","windSpeed":11.443180620411029,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.632095640981504,"Longitude":73.928272253624172,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId7","windSpeed":9.5328818771675614,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.8602118423841,"Longitude":73.917591851158747,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId8","windSpeed":11.622583133924092,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.743483308358982,"Longitude":74.1206610716174,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId9","windSpeed":9.7122843906806242,"obsTime":"2016-11-28T20:14:07.4050906Z"} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let registryManager = RegistryManager.CreateFromConnectionString(connectionString) | |
let addDevice deviceId = registryManager.AddDeviceAsync(new Device(deviceId)) | |
let printDeviceKey (device: Device) = printfn "Generated device key: %A" device.Authentication.SymmetricKey.PrimaryKey | |
let writeDeviceKey (device : Device) = | |
config.DeviceConfigs | |
|> Seq.iter (fun dc -> | |
if String.Compare(dc.DeviceId, device.Id) = 0 then | |
dc.Key <- device.Authentication.SymmetricKey.PrimaryKey | |
) | |
config.Save(configFilePath) | |
device | |
addDevice deviceId | |
|> Async.AwaitTask | |
|> Async.RunSynchronously | |
|> writeDeviceKey | |
|> printDeviceKey |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let getDevice deviceId = registryManager.GetDeviceAsync(deviceId) | |
try | |
addDevice deviceId | |
|> Async.AwaitTask | |
|> Async.RunSynchronously | |
|> writeDeviceKey | |
|> printDeviceKey | |
with | |
| :? System.AggregateException as e -> | |
e.InnerExceptions | |
|> Seq.iter (fun ex -> | |
if ex :? DeviceAlreadyExistsException then | |
getDevice deviceId | |
|> Async.AwaitTask | |
|> Async.RunSynchronously | |
|> writeDeviceKey | |
|> printDeviceKey | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open System.Linq | |
open FSharp.Configuration | |
type Config = YamlConfig<FilePath="../config/config.yaml"> | |
module Main = | |
[<EntryPoint>] | |
let main argv = | |
let config = Config() | |
let configFilePath = "../../../config/config.yaml" | |
config.Load(configFilePath) | |
printfn "%s" config.AzureIoTHubConfig.ConnectionString | |
let console = Console.ReadLine() | |
0 // return an integer exit code |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment