Skip to content

Instantly share code, notes, and snippets.

@ivanminutillo
Last active December 14, 2024 15:09
Show Gist options
  • Save ivanminutillo/59ac30364e85e3316d5c7b6a0c59ed5c to your computer and use it in GitHub Desktop.
Save ivanminutillo/59ac30364e85e3316d5c7b6a0c59ed5c to your computer and use it in GitHub Desktop.
Feed Refactoring
1. Feed Types:
├── PersonalFeeds (my_feed, following)
├── InstanceFeeds (local, federated)
├── ObjectFeeds (threads, media)
└── SystemFeeds (notifications, outbox)
2. Feed Publication Flow:
Activity Creation -> Feed Selection -> Publication -> Real-time Updates
└── Activity └── Target Feeds └── Storage └── PubSub Broadcast
- Post - Personal - DB Write - LiveView Updates
- Reply - Instance - Indices - Federation
- Boost - Object - Cache - Notifications
3. Feed Query Flow:
Request -> Boundaries -> Query Building -> Execution -> Loading -> Response
└── Filters └── Perms └── Optimized └── DB └── Preload └── Page
- Types - User Queries Query Batching Results
- Scope - Instance - Cursors - Indices Strategy
4. Proposed Module Structure:
Bonfire.Social.Feeds
├── Core # Core feed functionality
│ ├── FeedRegistry # Feed type registration & configuration
│ ├── FeedPublisher # Handle publishing to feeds
│ └── FeedQuery # Feed querying & pagination
├── Types # Different feed implementations
│ ├── PersonalFeed # Personal/following feeds
│ ├── InstanceFeed # Local/federated feeds
│ ├── ObjectFeed # Object-specific feeds
│ └── SystemFeed # System feeds (notifications, etc)
├── Boundaries # Access control & permissions
│ ├── FeedBoundary # Feed-level permissions
│ ├── ActivityBoundary # Activity-level permissions
│ └── InstanceBoundary # Instance-level boundaries
├── Storage # Storage concerns
│ ├── FeedStore # Feed persistence
│ ├── ActivityStore # Activity persistence
│ └── CacheStore # Caching layer
└── RealTime # Real-time functionality
├── FeedPubSub # PubSub for feed updates
├── Broadcaster # Handle broadcasting updates
└── Listener # Handle incoming updates
## Key workflows
1. Publishing to Feeds:
Bonfire.Social.Feeds.publish(activity, target_feeds, opts)
|> FeedRegistry.resolve_feeds() # Resolve feed targets
|> FeedBoundary.check_publication() # Check publication permissions
|> FeedPublisher.publish() # Handle actual publication
|> Broadcaster.notify() # Handle real-time updates
2. Querying feeds:
Bonfire.Social.Feeds.query(feed_id, filters, opts)
|> FeedRegistry.get_feed_type() # Get feed implementation
|> FeedBoundary.check_access() # Check access permissions
|> FeedQuery.build_query() # Build optimized query
|> FeedQuery.execute() # Execute with pagination
|> FeedQuery.load_results() # Load & preload results
3. Realtime updates:
Bonfire.Social.Feeds.RealTime.Broadcaster.broadcast(feed_id, update)
|> PubSub.broadcast() # Broadcast to subscribers
|> Federation.maybe_federate() # Handle federation
|> Notifications.maybe_notify() # Handle notifications
defmodule Bonfire.Social.FeedActivities do
@moduledoc """
Core module for handling feed publishing and activity management.
Ensures activities are properly published to appropriate feeds with
proper boundaries and real-time updates.
"""
use Arrows
use Untangle
import Ecto.Query
alias Bonfire.Data.Social.{Activity, FeedPublish}
alias Bonfire.Social.FeedQueries
alias Bonfire.Social.Activities
@doc """
Publishes an activity to specified feeds.
Handles feed selection, boundaries, and real-time updates.
## Options
* `:to_feeds` - List of feed IDs to publish to
* `:current_user` - User publishing the activity
* `:boundary` - Boundary settings for the activity
## Examples
publish(activity,
to_feeds: ["feed_id"],
current_user: current_user
)
"""
def publish(activity, opts \\ []) do
with {:ok, feed_ids} <- resolve_target_feeds(activity, opts),
{:ok, _} <- check_publish_permissions(activity, feed_ids, opts),
{:ok, publishes} <- create_feed_publishes(activity, feed_ids) do
# Handle real-time updates
broadcast_activity(activity, feed_ids)
{:ok, publishes}
end
end
@doc """
Resolves which feeds an activity should be published to based on
activity type, creator, and specified targets.
"""
def resolve_target_feeds(activity, opts) do
feed_ids = []
|> maybe_add_creator_outbox(activity, opts)
|> maybe_add_instance_feeds(activity, opts)
|> maybe_add_mentioned_feeds(activity, opts)
|> maybe_add_reply_feeds(activity, opts)
|> maybe_add_specified_feeds(opts[:to_feeds])
{:ok, feed_ids}
end
defp maybe_add_creator_outbox(feeds, activity, opts) do
case Activities.object_creator(activity) do
%{outbox_id: outbox_id} when not is_nil(outbox_id) ->
[outbox_id | feeds]
_ ->
feeds
end
end
defp maybe_add_instance_feeds(feeds, activity, opts) do
case get_boundary_setting(opts) do
"public" ->
[named_feed_id(:local) | feeds]
"public_remote" ->
[named_feed_id(:activity_pub) | feeds]
_ ->
feeds
end
end
defp maybe_add_mentioned_feeds(feeds, activity, opts) do
# Extract mentions from activity and add notification feeds
case extract_mentions(activity) do
[] ->
feeds
mentions ->
notification_feeds = mentions_to_notification_feeds(mentions)
notification_feeds ++ feeds
end
end
defp maybe_add_reply_feeds(feeds, activity, opts) do
case Activities.get_replied_to(activity) do
{:ok, replied_to} ->
thread_id = Activities.get_thread_id(replied_to)
[thread_id | feeds]
_ ->
feeds
end
end
defp maybe_add_specified_feeds(feeds, nil), do: feeds
defp maybe_add_specified_feeds(feeds, to_feeds) when is_list(to_feeds) do
to_feeds ++ feeds
end
@doc """
Checks if the activity can be published to the specified feeds
based on user permissions and boundaries.
"""
def check_publish_permissions(activity, feed_ids, opts) do
current_user = opts[:current_user]
with :ok <- check_creator_permission(activity, current_user),
:ok <- check_feed_permissions(feed_ids, current_user) do
:ok
end
end
defp check_creator_permission(activity, current_user) do
if Activities.created_by?(activity, current_user) do
:ok
else
{:error, :unauthorized}
end
end
defp check_feed_permissions(feed_ids, current_user) do
# Check if user can publish to each feed
Enum.reduce_while(feed_ids, :ok, fn feed_id, _acc ->
case can_publish_to_feed?(feed_id, current_user) do
true -> {:cont, :ok}
false -> {:halt, {:error, :unauthorized}}
end
end)
end
@doc """
Creates feed publish entries for an activity in specified feeds.
Handles database writes efficiently.
"""
def create_feed_publishes(activity, feed_ids) do
# Create FeedPublish entries in batches
publishes =
feed_ids
|> Enum.map(&build_feed_publish(activity, &1))
|> Enum.chunk_every(100)
|> Enum.flat_map(&insert_feed_publishes/1)
{:ok, publishes}
end
defp build_feed_publish(activity, feed_id) do
%FeedPublish{
id: activity.id,
feed_id: feed_id,
inserted_at: DateTime.utc_now()
}
end
defp insert_feed_publishes(publishes) do
{count, entries} = repo().insert_all(
FeedPublish,
publishes,
on_conflict: :nothing,
returning: true
)
entries
end
@doc """
Broadcasts activity updates to relevant PubSub topics.
Handles real-time feed updates.
"""
def broadcast_activity(activity, feed_ids) do
feed_ids
|> Enum.each(fn feed_id ->
topic = "feed:#{feed_id}"
Phoenix.PubSub.broadcast(
Bonfire.PubSub,
topic,
{:new_activity, %{
activity: activity,
feed_id: feed_id
}}
)
end)
end
@doc """
Deletes an activity from all feeds.
Handles cleanup and real-time updates.
"""
def delete_activity(activity_id) when is_binary(activity_id) do
with {count, _} <- repo().delete_all(
from fp in FeedPublish,
where: fp.id == ^activity_id
),
:ok <- broadcast_activity_deletion(activity_id) do
{:ok, count}
end
end
defp broadcast_activity_deletion(activity_id) do
Phoenix.PubSub.broadcast(
Bonfire.PubSub,
"feed:activities",
{:activity_deleted, activity_id}
)
end
# Helper functions for feed identification
defp named_feed_id(:local), do: "feed_local"
defp named_feed_id(:activity_pub), do: "feed_federated"
defp get_boundary_setting(opts) do
opts[:boundary] || "public"
end
end
defmodule Bonfire.Social.FeedQueries do
@moduledoc """
Handles core feed querying with optimizations for performance and federation.
Uses efficient cursor pagination and proper preloading strategies.
"""
use Arrows
use Untangle
import Ecto.Query
alias Bonfire.Data.Social.{Activity, FeedPublish}
alias Bonfire.Social.Activities
@doc """
Main entry point for fetching paginated feed activities with filtering.
Handles optimization, boundaries and federation.
## Options
* `:filters` - Map of filters to apply to the query
* `:cursor` - Pagination cursor
* `:limit` - Number of items per page
* `:current_user` - User requesting the feed (for boundaries)
* `:preloads` - List of associations to preload
## Examples
{:ok, %{entries: entries, metadata: metadata}} =
feed_paginated("feed_id",
filters: %{federation_scope: :local},
cursor: cursor,
current_user: current_user
)
"""
def feed_paginated(feed_id, opts \\ []) when is_binary(feed_id) do
with {:ok, query} <- build_base_query(feed_id),
{:ok, filtered} <- apply_filters(query, opts[:filters]),
{:ok, bounded} <- apply_boundaries(filtered, opts[:current_user]),
{:ok, paginated} <- paginate_results(bounded, opts) do
{:ok, process_results(paginated, opts)}
end
end
@doc """
Builds the optimized base query for a feed.
Uses proper indices and joins.
"""
def build_base_query(feed_id) do
query =
from fp in FeedPublish,
join: a in Activity,
on: a.id == fp.id,
where: fp.feed_id == ^feed_id,
select: %{
id: fp.id,
activity_id: a.id,
inserted_at: a.inserted_at,
# For efficient cursor pagination
cursor: fragment("?::text", row(a.inserted_at, a.id))
}
{:ok, query}
end
@doc """
Applies filters to the query based on provided options.
Handles common filtering needs like federation scope, object types etc.
"""
def apply_filters(query, filters) when is_map(filters) do
filtered = Enum.reduce(filters, query, &apply_filter/2)
{:ok, filtered}
end
def apply_filters(query, _), do: {:ok, query}
# Federation filtering
defp apply_filter({:federation_scope, :local}, query) do
query
|> join(:left, [fp, a], c in assoc(a, :subject_character))
|> where([fp, a, c], is_nil(c.peered))
end
defp apply_filter({:federation_scope, :federated}, query) do
query
|> join(:left, [fp, a], c in assoc(a, :subject_character))
|> where([fp, a, c], not is_nil(c.peered))
end
# Object type filtering
defp apply_filter({:object_type, type}, query) when not is_nil(type) do
query
|> join(:inner, [fp, a], o in assoc(a, :object))
|> where([fp, a, o], o.table_id == ^type)
end
# Time range filtering
defp apply_filter({:time_limit, days}, query) when is_integer(days) do
limit = DateTime.utc_now() |> DateTime.add(-days * 86_400)
where(query, [fp, a], a.inserted_at >= ^limit)
end
@doc """
Applies boundary checks to ensure proper access control.
Uses Activities module for permission checks.
"""
def apply_boundaries(query, current_user) do
bounded = Activities.as_permitted_for(query, current_user)
{:ok, bounded}
end
@doc """
Handles cursor-based pagination efficiently.
"""
def paginate_results(query, opts) do
case opts do
%{cursor: cursor} when is_binary(cursor) ->
paginate_from_cursor(query, cursor, opts[:limit] || 20)
_ ->
paginate_first_page(query, opts[:limit] || 20)
end
end
defp paginate_from_cursor(query, cursor, limit) do
with {:ok, {ts, id}} <- decode_cursor(cursor) do
paginated =
query
|> where([fp, a],
fragment("(?, ?) > (?, ?)",
a.inserted_at, a.id,
^ts, ^id
)
)
|> limit(^limit)
|> order_by([fp, a], [asc: a.inserted_at, asc: a.id])
{:ok, paginated}
end
end
defp paginate_first_page(query, limit) do
paginated =
query
|> limit(^limit)
|> order_by([fp, a], [desc: a.inserted_at, desc: a.id])
{:ok, paginated}
end
@doc """
Process query results with efficient preloading strategy
"""
def process_results(%{entries: entries} = page, opts) do
processed_entries =
entries
|> preload_in_batches(opts[:preloads] || default_preloads())
%{page | entries: processed_entries}
end
defp preload_in_batches(entries, preloads) do
entries
|> Enum.chunk_every(100)
|> Task.async_stream(&batch_preload(&1, preloads))
|> Enum.flat_map(&(&1))
end
defp batch_preload(entries, preloads) do
repo().preload(entries,
activity: [
:subject,
:verb,
replied: [:reply_to],
object: [
:creator,
created: [creator: [:profile, :character]]
]
]
)
end
defp default_preloads do
[
:activity,
activity: [
:subject,
:verb,
replied: [:reply_to],
object: [:creator]
]
]
end
defp decode_cursor(cursor) do
with {:ok, decoded} <- Base.decode64(cursor),
[ts, id] <- String.split(decoded, ":") do
{:ok, {ts, id}}
end
end
end
defmodule Bonfire.Social.Repo.Migrations.AddFeedIndices do
use Ecto.Migration
def change do
# Core feed indices
create index(:feed_publish, [:feed_id, :inserted_at, :id],
name: "feed_publish_feed_ordering_idx")
# For efficient activity joins and lookups
create index(:feed_publish, [:activity_id],
name: "feed_publish_activity_idx")
# For federation scoping
create index(:character, [:peered],
name: "character_federation_idx")
# For efficient activity sorting and pagination
create index(:activity, [:inserted_at, :id],
name: "activity_time_ordering_idx")
# For efficient boundary checks
create index(:activity, [:subject_id],
name: "activity_subject_idx")
# For efficient object type filtering
create index(:activity, [:object_id, :object_type],
name: "activity_object_type_idx")
# For reply threading
create index(:activity, [:replied_to_id],
name: "activity_replied_to_idx")
# For federation syncing
create index(:activity, [:federated_id],
name: "activity_federation_idx")
# Compound index for efficient ordering with federation scope
create index(:activity, [:federated_id, :inserted_at, :id],
name: "activity_federation_ordering_idx")
# For efficient boundaries in feeds
create index(:feed_publish, [:feed_id, :subject_id],
name: "feed_publish_subject_idx")
# For circle/boundaries filtering
create index("circle_members", [:circle_id, :member_id],
name: "circle_members_compound_idx")
end
# Consider dropping unused indices in a separate migration
def down do
drop_if_exists index(:feed_publish, [:feed_id, :inserted_at, :id])
drop_if_exists index(:feed_publish, [:activity_id])
drop_if_exists index(:character, [:peered])
drop_if_exists index(:activity, [:inserted_at, :id])
drop_if_exists index(:activity, [:subject_id])
drop_if_exists index(:activity, [:object_id, :object_type])
drop_if_exists index(:activity, [:replied_to_id])
drop_if_exists index(:activity, [:federated_id])
drop_if_exists index(:activity, [:federated_id, :inserted_at, :id])
drop_if_exists index(:feed_publish, [:feed_id, :subject_id])
drop_if_exists index("circle_members", [:circle_id, :member_id])
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment