Instrumenting block of code with ActiveSupport::Notifications

ActiveSupport::Notifications is an instrumentation toolkit for Ruby that is used by the Ruby on Rails framework internally. It was generally designed for listening to events during the entire life of the application but also provides two escape hatches to temporally instrument block of codes:

Providing callback (lambda / proc) to subscribed method:

callback = lambda {|name, started, finished, unique_id, payload| ... }
ActiveSupport::Notifications.subscribed(callback, event) do
  ...
end

Manually unsubscribing from the events:

subscriber = ActiveSupport::Notifications.subscribed(event) do |name, started, finished, unique_id, payload|
  ...
end

ActiveSupport::Notifications.unsubscribe(subscriber)

With that knowledge we can build a small debugging tool to inspect events during a block of code (i.e. requests made by excon gem):

def instrument(&block)
  requests = []
  subscriber = ActiveSupport::Notifications.subscribe('excon.request') do |_, _start, _end, _unique_id, payload|
    requests << payload
  end

  block.call

  requests
ensure
  ActiveSupport::Notifications.unsubscribe(subscriber)
end

requests = instrument(my_method_making_api_requests)

However, if you run this code on a multi-threaded server like Puma, you will run into a bug where events from requests in my_method_making_api_requests are captured from other threads as well. To fix it, we first need to understand how ActiveSupport::Notifications notifies listeners about the event.

Whenever we call ActiveSupport::Notifications.instrument method, it selects instrumenter from InstrumentationRegistry via InstrumentationRegistry.instance.instrumenter_for(notifier) for current notifier strategy (by default it uses fanout queue, which just pushes events to all registers subscribers).

InstrumenterRegistry extends ActiveSupport::PerThreadRegistry module so for each thread, it returns a separate instance whenever we call .instance. Each thread’s instance has a different Instrumenter class identifier by unique id (unique_id argument passed to subscribe block or callback). To get the current thread instrument’s id we need to call ActiveSupport::Notifications.instrumenter.id.

So basically to instrument events from the current thread, we need to its instrumenter’s id, and ignore events made by all other threads:

def instrument(&block)
  current_thread_instrumenter_id = ActiveSupport::Notifications.instrumenter.id

  requests = []
  subscriber = ActiveSupport::Notifications.subscribe('excon.request') do |_, _start, _end, instrumenter_id, payload|
    # Collect payloads for current thread only
    requests << payload if instrumenter_id == current_thread_instrumenter_id
  end

  block.call

  requests
ensure
  ActiveSupport::Notifications.unsubscribe(subscriber)
end