Skip to content

Instantly share code, notes, and snippets.

@rmb938
Created September 26, 2025 19:46
Show Gist options
  • Select an option

  • Save rmb938/e3d14026c8d8ef524a61c269e1499598 to your computer and use it in GitHub Desktop.

Select an option

Save rmb938/e3d14026c8d8ef524a61c269e1499598 to your computer and use it in GitHub Desktop.
input:
generate:
interval: 1s # Max 1s so API devs don't get mad
batch_size: 1
auto_replay_nacks: true
# Set the timestamp in the message, starting at 1615190400 (Monday, March 8, 2021 8:00:00 AM GMT) and add 300 seconds each time
count: 477307 # Maximum this many 5 min intervals since 1615190400, as of Sept 20th 2025
mapping: |
root = {}
root.timestamp = 1615190400 + (300 * counter())
pipeline:
processors:
# Load the current timestamp offset from cache
- branch:
request_map: 'root = ""'
processors:
# Try to pull mapping from cache
- cache:
resource: mapping_cache
key: osrs_item_mapping
operator: get
# Otherwise store it in the cache
- catch:
- http:
url: https://prices.runescape.wiki/api/v1/osrs/mapping
verb: GET
headers:
# Custom User agent with discord username as recommended by API devs
User-Agent: WarpStream - Bento - @rmb938 in Discord
- mapping: |
root = this.map_each(item -> {
(item.id.string()): item
}).squash()
- cache:
resource: mapping_cache
key: osrs_item_mapping
operator: set
ttl: 1h
value: "${! content() }"
result_map: |
root.item_mapping = this
# Make the http call on 5m endpoint, using branch so we don't overwrite root and clear the root.item_mapping
- branch:
processors:
- http:
url: https://prices.runescape.wiki/api/v1/osrs/5m?timestamp=${! this.timestamp }
verb: GET
headers:
# Custom User agent with discord username as recommended by API devs
User-Agent: WarpStream - Bento - @rmb938 in Discord
parallel: false
result_map: |
root.data = this.data
# Copy the timestamp into all the data items
- mapping: |
root.data = this.data.map_each(item -> item.value.merge({"item_id": item.key.number(), "timestamp": this.timestamp, "name": this.item_mapping.get(item.key).name, "icon": this.item_mapping.get(item.key).icon}))
# Move all the data items into the root
- mapping: |
root = this.data
# Split the dict into their own messages
- unarchive:
format: json_map
# Remove avgHighPrice and highPriceVolume if volume is 0
# This means there was no insta buy during this time period
- mapping: |
root = this
root.avgHighPrice = if this.highPriceVolume == 0 { deleted() } else { this.avgHighPrice }
root.highPriceVolume = if this.highPriceVolume == 0 { deleted() } else { this.highPriceVolume }
# Remove avgLowPrice and lowPriceVolume if volume is 0
# This means there was no insta sell during this time period
- mapping: |
root = this
root.avgLowPrice = if this.lowPriceVolume == 0 { deleted() } else { this.avgLowPrice }
root.lowPriceVolume = if this.lowPriceVolume == 0 { deleted() } else { this.lowPriceVolume }
# Convert unix time to timestamp
- mapping: |
root = this
root.timestamp = this.timestamp.ts_format("2006-01-02T15:04:05Z07:00", "UTC")
output:
broker:
pattern: fan_out
outputs:
# - kafka_franz:
# seed_brokers:
# - localhost:9092
# topic: osrs_prices_5m
# key: ${! this.id }
# partitioner: murmur2_hash
# compression: zstd
# # Recommended WarpStream config
# metadata_max_age: 60s
# max_buffered_records: 1000000
# max_message_bytes: 16000000
- stdout:
codec: lines
cache_resources:
- label: mapping_cache
memory: {}
- label: timestamp_offset
memory: {}
input:
http_client:
# Query 5m with timestamp
# Timestamp is rounded to nearest 5 minute interval then goes back in time by 10 mins so we have data.
# If we don't go back in time by 10 mins, we query the current 5 min interval which may not be populated at query time.
url: https://prices.runescape.wiki/api/v1/osrs/5m?timestamp=${! ((((timestamp_unix().number() / 300).round()) * 300) - 600) }
verb: GET
headers:
# Custom User agent with discord username as recommended by API devs
User-Agent: WarpStream - Bento - @rmb938 in Discord
rate_limit: osrs_prices_5m
timeout: 5s
auto_replay_nacks: true
# Only query once every 5 minutes since data changes only once every 5 minutes
rate_limit_resources:
- label: osrs_prices_5m
local:
count: 1
interval: 5m
pipeline:
processors:
- branch:
request_map: 'root = ""'
processors:
# Try to pull mapping from cache
- cache:
resource: mapping_cache
key: osrs_item_mapping
operator: get
# Otherwise store it in the cache
- catch:
- http:
url: https://prices.runescape.wiki/api/v1/osrs/mapping
verb: GET
headers:
# Custom User agent with discord username as recommended by API devs
User-Agent: WarpStream - Bento - @rmb938 in Discord
- mapping: |
root = this.map_each(item -> {
(item.id.string()): item
}).squash()
- cache:
resource: mapping_cache
key: osrs_item_mapping
operator: set
ttl: 1h
value: "${! content() }"
result_map: |
root.item_mapping = this
# Copy the timestamp into all the data items
- mapping: |
root.data = this.data.map_each(item -> item.value.merge({"item_id": item.key.number(), "timestamp": this.timestamp, "name": this.item_mapping.get(item.key).name, "icon": this.item_mapping.get(item.key).icon}))
# Move all the data items into the root
- mapping: |
root = this.data
# Split the dict into their own messages
- unarchive:
format: json_map
# Add a generated event id
- mapping: |
root = this
root.event_id = uuid_v4()
# Remove avgHighPrice and highPriceVolume if volume is 0
# This means there was no insta buy during this time period
- mapping: |
root = this
root.avgHighPrice = if this.highPriceVolume == 0 { deleted() } else { this.avgHighPrice }
root.highPriceVolume = if this.highPriceVolume == 0 { deleted() } else { this.highPriceVolume }
# Remove avgLowPrice and lowPriceVolume if volume is 0
# This means there was no insta sell during this time period
- mapping: |
root = this
root.avgLowPrice = if this.lowPriceVolume == 0 { deleted() } else { this.avgLowPrice }
root.lowPriceVolume = if this.lowPriceVolume == 0 { deleted() } else { this.lowPriceVolume }
# Convert unix time to timestamp
- mapping: |
root = this
root.timestamp = this.timestamp.ts_format("2006-01-02T15:04:05Z07:00", "UTC")
output:
broker:
pattern: fan_out
outputs:
# - kafka_franz:
# seed_brokers:
# - localhost:9092
# topic: osrs_prices_5m
# key: ${! this.id }
# partitioner: murmur2_hash
# compression: zstd
# # Recommended WarpStream config
# metadata_max_age: 60s
# max_buffered_records: 1000000
# max_message_bytes: 16000000
- stdout:
codec: lines
cache_resources:
- label: mapping_cache
memory: {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment