The Raspberry Pi RP2040 chip is a remarkable microcontroller. The programmable IO block (PIO) is only one of many great features of this chip. In this post, I show you a method of how to utilise PIO, to capture the speed of an attached PC fan with included software filter.
Most fans have a feedback signal, where you can measure the actual speed of the fan. This feedback signal is usually implemented as an open collector. Therefore you can attach this line directory to a microcontroller and use the included pull-up to read the signal.
Often, the signal returned from the fan has some noise in it. Sometimes you get spikes that go over the logic threshold and therefore need to be filtered.
Implementation
The following code implements RPM measurement as PIO algorithm and wraps it into a class.
# Copyright (c) 2021 by Lucky Resistor. https://luckyresistor.me/ # See LICENSE file for details. """ Tool to capture RPM input from a FAN. """ import rp2 from micropython import const from machine import Pin # The frequency for the measurement. FREQ = const(1_000_000) # The counter start value. COUNTER_START = const(0x10000) @rp2.asm_pio(in_shiftdir=rp2.PIO.SHIFT_LEFT, autopull=False, autopush=False) def measure_rpm(): # Initialize X with 0x10000 for 16bit values label("switch_to_low") set(x, 1) mov(isr, x) in_(null, 16) mov(x, isr) # Initialize the ISR with 0x00000000 set(y, 0) mov(isr, y) # wait for stable high label("wait_for_high") in_(pins, 1) set(isr, 0) mov(y, invert(isr)) jmp(not_y, "switch_to_high") jmp(x_dec, "wait_for_high") # We run into a timeout, send zero, keep waiting set(y, 0) mov(isr, y) push(noblock) jmp("switch_to_low") # Initialize the ISR with the 0xFFFFFFFFF label("switch_to_high") set(y, 0) mov(isr, invert(y)) # Wait for a stable low label("wait_for_low") in_(pins, 1) set(isr, 0) mov(y, isr) jmp(not_y, "send_data") jmp(x_dec, "wait_for_low") # We ran into a timeout, send zero, restart. set(y, 0) mov(isr, y) push(noblock) jmp("switch_to_low") # Send the counter at this point label("send_data") mov(isr, x) push(noblock) # Repeat jmp("switch_to_low") class RpmInput: """ Class to measure a RPM signal from a fan. """ def __init__(self, sm_id: int, pin: int): """ Create a new RPM input instance. :param sm_id: The state machine identifier to use. :param pin: The pin number used for the input. """ self._sm = rp2.StateMachine(sm_id, measure_rpm, freq=FREQ, in_base=Pin(pin, mode=Pin.IN, pull=Pin.PULL_UP)) self._sm.active(1) self._value = 0 def poll(self): """ Poll the interface. Call this method from the main loop as often as you like. """ self._value = self._sm.get() def read_rpm(self): """ Convert the last read value into an RPM value. :return: A RPM value, or zero if no signal received. """ if self._value == 0: return 0 counts = COUNTER_START - self._value # signals_per_rotation = 2 # Formula is: 1 / ( counts * 5 * 1/frequency ) / signals_per_rotation * 60 # Solves to: 12 * frequency / counts return 6 * FREQ // counts
The PIO Code
@rp2.asm_pio(in_shiftdir=rp2.PIO.SHIFT_LEFT, autopull=False, autopush=False) def measure_rpm(): # Initialize X with 0x10000 for 16bit values label("switch_to_low") set(x, 1) mov(isr, x) in_(null, 16) mov(x, isr)
The code starts in line 19 with instructions to disable the auto pull feature and adds a reminder for the shift direction (it is left by default).
After the start, I initialise X
with 0x10000
. The set
command is limited to five bits, so I load the ISR
with one (line 23, 24) and shift this bit to the right location (line 25), then copy it back to X
.
Now, X
is loaded with 0x10000
. By changing the number of shift steps, it is easy to adjust the precision for this counter.
# Initialize the ISR with 0x00000000 set(y, 0) mov(isr, y)
Next I initialise the ISR
register with zero. The register is used to act as filter.
# wait for stable high label("wait_for_high") in_(pins, 1) set(isr, 0) mov(y, invert(isr)) jmp(not_y, "switch_to_high") jmp(x_dec, "wait_for_high")
The input is read and its value is shifted into the ISR
register (line 34). Next, I reset the read counter for the ISR
, which is necessary to keep the bits shifting (line 35).
Now I copy the current value of the register into Y
, but invert all bits (line 36). If there were 32 subsequent reads of 1
, the register Y
is therefore zero.
If register Y
is zero, I jump to the detection of the next phase (line 37), if not, I decrement X
and if X
is greater than zero, repeat this block (line 38).
# We run into a timeout, send zero, keep waiting set(y, 0) mov(isr, y) push(noblock) jmp("switch_to_low")
The block above is executed I case of a timeout. Here I push a zero into the FIFO and jump back to the start of the code. This will keep waiting for a high level on the line and keep pushing zeros out of the FIFO to indicate a timeout.
# Initialize the ISR with the 0xFFFFFFFFF label("switch_to_high") set(y, 0) mov(isr, invert(y)) # Wait for a stable low label("wait_for_low") in_(pins, 1) set(isr, 0) mov(y, isr) jmp(not_y, "send_data") jmp(x_dec, "wait_for_low") # We ran into a timeout, send zero, restart. set(y, 0) mov(isr, y) push(noblock) jmp("switch_to_low")
This is almost the same code as before, but in this case I initialise the ISR
with ones (line 49). Also, because I test for zeroes, I copy the ISR
contents without inverting it (line 55).
If the transition from high to low is successfully detected, the code continues to send the counter value into the FIFO (line 56).
# Send the counter at this point label("send_data") mov(isr, x) push(noblock) # Repeat jmp("switch_to_low")
The remaining count in X
is copied to the ISR
, which is now used to push the data into the FIFO, where it can be read from the application.
This simple code allows stable RPM measurements, even if there are short spikes and other minor problems in the signal from the fan.
It will run into a timeout if the signal is lost or if there is only noise on the line.
The Wrapper Class
The wrapper class simplifies the usage of the PIO code and converts the output into an approximation of an RPM value.
class RpmInput: """ Class to measure a RPM signal from a fan. """ def __init__(self, sm_id: int, pin: int): """ Create a new RPM input instance. :param sm_id: The state machine identifier to use. :param pin: The pin number used for the input. """ self._sm = rp2.StateMachine(sm_id, measure_rpm, freq=FREQ, in_base=Pin(pin, mode=Pin.IN, pull=Pin.PULL_UP)) self._sm.active(1) self._value = 0
A new instance is created with the identifier for the state machine and the pin number. The code then creates the state machine with the right frequency and also initialises the pin with the correct settings.
At the end, the state machine is activates and the value buffer is initialised with zero.
def poll(self): """ Poll the interface. Call this method from the main loop as often as you like. """ self._value = self._sm.get() def read_rpm(self): """ Convert the last read value into an RPM value. :return: A RPM value, or zero if no signal received. """ if self._value == 0: return 0 counts = COUNTER_START - self._value # signals_per_rotation = 2 # Formula is: 1 / ( counts * 5 * 1/frequency ) / signals_per_rotation * 60 # Solves to: 6 * frequency / counts return 6 * FREQ // counts
Because of the FIFO, it is requires to continuously read values from the buffer to get current measurements. Therefore I added a poll
method. This method should be called in the main loop.
The method read_rpm
just uses the last read value and converts it into an RPM value. This conversion is not perfect, because the calculation ignores the cycles for pushing the data and initializing registers. It is close enough for monitoring the speed of a fan and keep calculations at a minimum.
Conclusion
I hope you found this code useful. 😄 Let me know if you have ideas for improvements or found new uses for the filter code.
More Posts

The 3D Printed Modular Lantern

Stronger 3D Printed Parts with Vertical Perimeter Linking

New Additions to the Storage Box System

Three Ways to Integrate LED Light Into the Modular Lantern

The Hinges and its Secrets for Perfect PETG Print
