Posted in raspberrypi, aws

header_lq.jpg

Did you ever have a lot of data in your code that sits better ina file? Why not trying something else?

If I can, I always avoid storing information locally, especially when I need-to change something remotely. Recently I made Conway's Game of Life and I want the patterns to be accessible by other devices. This will allow me to create new patterns, store them and reuse them as-they are added.

For any thought/question/problem, use the comment section and tell me your thoughts on this. 

What do I need to get started?

If you never had any contact with AWS (Amazon Web Services), you can create an account for free at aws.amazon.com. 

DynamoDb it's a NoSQL database where you can store and retrieve data along with other capabilities. The best part, you can use 25Gb where you can perform 25 reads and writes per second for free. If these values get deprecated, just take a look at what free tier has to offer.

In other to be able to save and get items from DynamoDb we need to have the right permissions to access the AWS from Raspberry Pi. Go to the AWS Console and create (or use an existing user ) and attach the following policy to it, DynamoDbFullAccess. If you create a new user, don't forget to save the access key id and secret access key, we need them later. That's it the only thing we need to do from the AWS Console.

Take me to the code

Before jumping directly into the code, I will shortly describe what we will do. First, I will integrate DynamoDB into one of my old project, Game of Life. However, you can apply the concepts to any other project. 

I divided my code into two parts. The first one is responsible for seeding the table with data. The second one, as you suspect, it is responsible for reading the saved information.

Below, don't forget to replace the values for AWS_ACCESS_KEY and AWS_SECRET_ACCESS s

Seeding the table

This part can happen from any device you want, your PC or any other board that comes in handy.

This part is very simple, and you can see it from the code. We just need to create a client for DynamoDB using boto3. After we managed to do this, we have to create the table and save the data in it.

Because of my project, I need to save matrixes. Because DynamoDb does not support matrixes (I don't know if any database supports them), I need a serializer/deserializer. I didn't want to pollute the code with my own version, so I used pickle instead. 

I created three patterns for Game Of Life and stored them GameOfLifePatterns table.

import boto3
import pickle

AWS_ACCESS_KEY = 'REPLACE_ME'
AWS_SECRET_ACCESS = 'REPLACE_ME'
TABLE_NAME = 'GameOfLifePatterns' def createTable(client): table = client.create_table( TableName=TABLE_NAME, KeySchema=[ { 'AttributeName': 'pattern-id', 'KeyType': 'HASH' } ], AttributeDefinitions=[ { 'AttributeName': 'pattern-id', 'AttributeType': 'N' } ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) # Wait until the table exists. client.get_waiter('table_exists').wait(TableName=TABLE_NAME) print "Table with name %s created.\n" % TABLE_NAME def putPattern(client, id, pattern): response = client.put_item( TableName=TABLE_NAME, Item={ 'pattern-id': { 'N': id }, 'pattern': { 'S': pickle.dumps(pattern) } } ) print "Item with id %s and pattern %s saved.\n" % (id, pattern) if __name__ == '__main__': try: dynamodbClient = boto3.client( 'dynamodb', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS, region_name='eu-west-1' ) # just delete/comment this line if you alredy have the tabel # and you need to just save new things createTable(dynamodbClient) pulsy = [ [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0, 0], [0, 1, 1, 1, 0, 0, 1, 1], [0, 0, 0, 1, 0, 1, 0, 1], [0, 0, 0, 0, 0, 1, 1, 0] ] toad = [ [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 1, 1, 1, 0, 0], [0, 0, 1, 1, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0] ] beacon = [ [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 1, 1, 0, 0, 0, 0], [0, 0, 1, 1, 0, 0, 0, 0], [0, 0, 0, 0, 1, 1, 0, 0], [0, 0, 0, 0, 1, 1, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0] ] # and many others putPattern(dynamodbClient, "0", pulsy) putPattern(dynamodbClient, "1", toad) putPattern(dynamodbClient, "2", beacon) # close execution by pressing CTRL + C except KeyboardInterrupt: print("Intrerrupted by user") pass finally: print("Program stopped")

I saved the above code in a save-patterns.py file, run it to save the patterns:

python save-patterns.py

Let's use the stored data

The data can be used for almost any device, as long it supports a connection to the internet. 

For my project, I just replaced the hard coded pattern (stored in a variable) with a method that retrieves it from the DynamoDB table. I chose to randomly use one pattern from the DynamoDb.

import time
import boto3
import pickle
import random

from luma.led_matrix.device import max7219
from luma.core.interface.serial import spi, noop
from luma.core.render import canvas

AWS_ACCESS_KEY = 'REPLACE_ME'
AWS_SECRET_ACCESS = 'REPLACE_ME'
TABLE_NAME = 'GameOfLifePatterns' UNIVERSE_WIDTH = 8 UNIVERSE_HEIGHT = 8 def isAlive(cell): return cell == 1 def drawMatrix(device, universe): with canvas(device) as draw: for y in range(UNIVERSE_WIDTH): for x in range(UNIVERSE_HEIGHT): fill = 'white' if isAlive(universe[x][y]) else 'black' draw.point((x,y), fill=fill) def countNeighbors(universe, cellX, cellY): count = 0 neighborsRadius =[-1, 0, 1] for x in neighborsRadius: neighbordPositionX = cellX + x if neighbordPositionX >= UNIVERSE_WIDTH or neighbordPositionX < 0: continue for y in neighborsRadius: neighbordPositionY = cellY + y if neighbordPositionY >= UNIVERSE_HEIGHT or neighbordPositionY < 0: continue if cellX != neighbordPositionX or cellY != neighbordPositionY: count += universe[neighbordPositionX][neighbordPositionY] return count def gameOfLife(universe): # copy the universe so that we can modify it without worry newUniverse = [row[:] for row in universe] for cellX in range(UNIVERSE_WIDTH): for cellY in range(UNIVERSE_HEIGHT): neighbors = countNeighbors(universe, cellX, cellY) if isAlive(newUniverse[cellX][cellY]): # Any live cell with fewer than two live neighbors dies, as if by under population. # Any live cell with more than three live neighbors dies, as if by overpopulation. if neighbors < 2 or neighbors > 3: newUniverse[cellX][cellY] = 0 # Any live cell with two or three live neighbors lives on to the next generation. elif neighbors == 2 or neighbors == 3: newUniverse[cellX][cellY] = 1 # Any dead cell with exactly three live neighbors becomes a live cell, as if by reproduction. elif neighbors == 3: newUniverse[cellX][cellY] = 1 return newUniverse def getDevice(n, block_orientation, rotate): # create matrix device serial = spi(port=0, device=0, gpio=noop()) device = max7219(serial, cascaded=n or 1, block_orientation=block_orientation, rotate=rotate or 0) print("Created device") return device def getPattern(client, id): response = client.get_item( TableName=TABLE_NAME, Key={ 'pattern-id': { 'N': str(id) } } ) pattern = response['Item']['pattern'][u'S'] return pickle.loads(pattern) def getNumberOfItems(client): response = client.scan( TableName=TABLE_NAME ) itemCount = response['Count'] return itemCount if __name__ == '__main__': try: dynamodbClient = boto3.client( 'dynamodb', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS, region_name='eu-west-1' ) # replace 1 with the number of devices device = getDevice(1, 0, 0) # get a random pattern from DynamoDb numberOfPatterns = getNumberOfItems(dynamodbClient) universe = getPattern(dynamodbClient, random.randint(0, numberOfPatterns)) while 1: drawMatrix(device, universe) time.sleep(1) universe = gameOfLife(universe) # close execution by pressing CTRL + C except KeyboardInterrupt: print("Intrerrupted by user") pass finally: print("Program stopped")

I saved the above code in a game-of-life.py file, run it using the command below to play a random pattern from DynamoDb. 

python game-of-life.py

Conclusion

Storing data in a different storage than the local one is very useful. Especially when you have multiple devices access the same source of data. 

This can get very useful when you have devices that stores data from the sensor How-to be used later by other devices or even by an interface.

You managed to go through my poor writing skills, brave one. 

Posted in raspberrypi, led

header_lq.jpg(photo made by Sfecles Petru)

Did you ever create a game in Raspberry Pi? 

For me, it's the first one, and it's a zero-player game called Game Of Life, also known as Conway Game. 

The game starts with a pattern (initial input) being the only interaction that the user/creator will have with the game. Next, you sit back and relax observing how the initial pattern evolves over time.

Let's see how it evolves! Feel free to use the comment section for any question, curiosity or problem.

The hardware to play with

As the title suggests, you need a Raspberry Pi, you can use any variant you want, for this project I will use a Rasp Zero.

The Game Of Life works in an infinite 2D space but for simplicity of this project, I chose to use an 8x8 LED matrix. But you are not limited, feel free to use as many and any size you like. 

Raspberry Pi and MAX7219 LED Matrix connections

Because I'm using a matrix with a MAX7219 (SPI) the following table shows how to create the connections. If you are using WS2812 NeoPixels (DMA)/NeoSegments please use the following instructions to make the connections.

 MAX7219Raspberry Pi Zero
1VCC (5V)VCC 5V (PIN 2)
2GNDGND (PIN 4)
3DINGPIO10 with MOSI (PIN 19)
4CSGPIO8 with SPI CE0 (PIN 24)
5CLKGPIO11 with SPI CLK (PIN 23)

The code evolves

The game's universe is governed by 4 well-defined laws. The universe is formed by a grid of cells/dots which can be dead or alive. For this project, the dead cells represent turned off LEDs and the live cells are represented by bright LEDs. On each iteration, the laws are applied and the shape changes. The laws:

  1. Any live cell with fewer than two live neighbors dies as if caused by underpopulation.
  2. Any live cell with two or three live neighbors lives on to the next generation.
  3. Any live cell with more than three live neighbors dies, as if by overpopulation.
  4. Any dead cell with exactly three live neighbors becomes a live cell, as if by reproduction.

We need to enable SPI, an easy tutorial can be found on luma.led_matrix documentation. If you don't want to open the link, just run sudo raspi-config and enable SPI in Interfacing Options. Install lume.led_matrix library in python by running the following commands and we are ready to go:

sudo usermod -a -G spi,gpio pi
sudo apt-get install build-essential python-dev python-pip libfreetype6-dev libjpeg-dev
sudo -H pip install --upgrade luma.led_matrix

In the code, we start with the initial universe, we draw it to our matrix and then are applied the laws of the universe. After this, a new universe is born and the process starts with displaying it.

When I created the device, I encountered some problems (still a mystery) with the initialization. The only way to make it work was to return it from a function and I don't have any idea why it works.

I created 3 main functions, one that is brightening the LEDs from the matrix (drawMatrix), one that applies the laws of the universe (gameOfLife) and another one that counts the neighbors of a cell from the matrix (countNeighbors).

Feel free to modify the UNIVERSE_WIDTH and UNIVERSE_HEIGHT constants for your project.

With a few modification, you can use other devices, feel free to adapt the code for your use case. A good start is to replace getDevice function, if you are not using MAX7219 with the respective code that can be easily obtained from the documentation of luma.

import time

from luma.led_matrix.device import max7219
from luma.core.interface.serial import spi, noop
from luma.core.render import canvas

UNIVERSE_WIDTH = 8
UNIVERSE_HEIGHT = 8

def isAlive(cell):
    return cell == 1

def drawMatrix(device, universe):
    with canvas(device) as draw:
        for y in range(UNIVERSE_WIDTH):
            for x in range(UNIVERSE_HEIGHT):
                fill = 'white' if isAlive(universe[x][y]) else 'black' 
                draw.point((x,y), fill=fill)

def countNeighbors(universe, cellX, cellY):
    count = 0
    neighborsRadius =[-1, 0, 1]
    for x in neighborsRadius:
        neighbordPositionX = cellX + x
        if neighbordPositionX >= UNIVERSE_WIDTH or neighbordPositionX < 0:
            continue

        for y in neighborsRadius:
            neighbordPositionY = cellY + y
            if neighbordPositionY >= UNIVERSE_HEIGHT or neighbordPositionY < 0:
                continue

            if cellX != neighbordPositionX or cellY != neighbordPositionY:
                count += universe[neighbordPositionX][neighbordPositionY]
    
    return count

def gameOfLife(universe):
    # copy the universe so that we can modify it without worry 
    newUniverse = [row[:] for row in universe] 
    for cellX in range(UNIVERSE_WIDTH):
        for cellY in range(UNIVERSE_HEIGHT):
            neighbors = countNeighbors(universe, cellX, cellY)

            if isAlive(newUniverse[cellX][cellY]):
                # Any live cell with fewer than two live neighbors dies, as if by under population.
                # Any live cell with more than three live neighbors dies, as if by overpopulation.
                if neighbors < 2 or neighbors > 3:
                    newUniverse[cellX][cellY] = 0
                # Any live cell with two or three live neighbors lives on to the next generation.
                elif neighbors == 2 or neighbors == 3:
                    newUniverse[cellX][cellY] = 1
            # Any dead cell with exactly three live neighbors becomes a live cell, as if by reproduction.
            elif neighbors == 3:
                newUniverse[cellX][cellY] = 1
    
    return newUniverse


def getDevice(n, block_orientation, rotate):
    # create matrix device
    serial = spi(port=0, device=0, gpio=noop())
    device = max7219(serial, cascaded=n or 1, block_orientation=block_orientation, rotate=rotate or 0)
    return device

if __name__ == '__main__':
    try:   

        device = getDevice(1, 0, 0)

        # 1/4 pulsar, feel free to replace/add different life forms
        universe = [
            [0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 0, 1, 1, 0],
            [0, 0, 0, 0, 0, 0, 0, 0],
            [0, 1, 1, 1, 0, 0, 1, 1],
            [0, 0, 0, 1, 0, 1, 0, 1],
            [0, 0, 0, 0, 0, 1, 1, 0]
        ]

        while 1:
            drawMatrix(device, universe)
            time.sleep(1)
            universe = gameOfLife(universe)
 
        # close execution by pressing CTRL + C
    except KeyboardInterrupt:
        print("Intrerrupted by user")
        pass
    finally:
        print("Program stopped")

Run the code above will bright the LEDs for a quarter of a pulsar. I saved the above code in a led-matrix.py file, run it using the command below and press CTRL + C to cancel anytime.

python led-matrix.py

A quarter of a pulsar

Conclusion

Besides the obvious extension that can be made, adding more matrixes, another one is to make the 'universe round'. When the bounds are crossed, the game to continue from the opposite side of the border.

This game can be moved to different sources of light, disco ball, a room full of lights and many other to make custom patterns from an initial input, maybe by touching.

Thank you for reading and stay bright.

Posted in raspberrypi, telegram, bot

How is Telegram different from WhatsApp?(photo made by Sfecles Petru)

I spent some time trying to figure out how to create a telegram bot. I really got lost in the official documentation. Then, I found the framework that has made everything easier, telepot.

What is telegram? Telegram is a messaging app that is simple to use and one of the main focuses is security. This sound very similar to WhatsApp, but you can find here how it's different.

I find bots being very useful, especially when I remotely trying to control something from my house. If you like creating bots, find that I created a similar post, but using a discord bot.

The simple hardware 

As you can see in the image below, the circuit is very simple. When you connect the LED to the Raspberry Pi, don't forget to add the resistor (I use one of 1K ohm).

 LED connected to Raspberry Pi Zero

Control your bot like a 'boss'

But first, let's create the bot by chatting with @BotFather (they have a good sense of humor), the bot above all bots. Write the following command /newbot and follow the steps to configure the name and the id for the bot. I named the bot I created it, TheHumbleCodeBot. After you finish this steps you will be given a unique token, save it, we will need it later.

Creating a telegram bot with the help of BotFather

Now, we have to install telepot. It supports multiple version of Python. You can find more details in the introduction of the telepot framework.

pip install telepot

The good thing is that we don't have to make a request in order to see if something wrote something in the chat. Instead, we can use a built-in functionality, Message Loop, from telebot that makes use of Webhooks and continuously will apply a handling function for every message received. 

import RPi.GPIO as GPIO
import telepot
import time
from telepot.loop import MessageLoop

# use channel numbers on the Broadcom SOC
GPIO.setmode(GPIO.BCM)

LED_PIN = 19
GPIO.setup(LED_PIN, GPIO.OUT)

bot = telepot.Bot('579430837:AAEG-ll76_EnQ2XBQqaN3B4wQpptpCB5PY0')

def turnOn(chatId):
    GPIO.output(LED_PIN, True)
    bot.sendMessage(chatId, 'The led is ON now!')

def turnOff(chatId):
    GPIO.output(LED_PIN, False)
    bot.sendMessage(chatId, 'The led is OFF now!')

def handleCommand(msg):
    contentType, chatType, chatId = telepot.glance(msg) # extract “headline info”
    print(contentType, chatType, chatId)

    if contentType != 'text':
        return # nothing to do

    message = msg['text']

    if not message.startswith('!'):
        return # not a command
    
    command = message[1:].lower() # remove ! from the command name
    if command == 'on':
        turnOn(chatId)
    elif command == 'off':
        turnOff(chatId)

if __name__ == '__main__':
    try:
        MessageLoop(bot, handleCommand).run_as_thread()

        while 1:
            # just don't finish the program
            time.sleep(10)
        # close execution by pressing CTRL + C
    except KeyboardInterrupt:
        print("Intrerrupted by user")
        pass
    finally:
        print("Program stopped")
        GPIO.cleanup()

To start the bot is pretty easy, save the above code in a file named bot.py and run it using the following command in the console:

python bot.py

In order to chat with your bot, you can replace the bot id in the following name, https://web.telegram.org/#/im?p=@YOUR_BOT_ID (mine is TheHumbleCodeBot).

Chatting with the telegram bot

Conclusion

The bots have a good potential, especially when you want to control something from your home or just receiving updates. You can append anything you want to your bot, temperature, motion sensor, led matrixes and many others. 

I hope this post helps you to create a bot much easier for your current/future projects.

Thank you for reading, and never stop receiving knowledge!

Posted in raspberrypi, led

How to make a linear color transition with an RGB LED(photo made by Sfecles Petru)

Why another post about RGB LEDs when are thousands already? I searched everywhere, but I couldn't find something about color transitions for an RGB LED.

Usually, when changing the color of an RGB LED, everything is happening in an instant (e.g., RED to BLUE). Being unpleasant for the eye may be irritating. Depending on the situations, you need a beautiful and calm change in color that is more appealing to the eye (why not, to the brain ultimately).

The pin connections.

I use a Raspberry Pi Zero and an RGB LED connected to PWM. Don't be afraid to use any other Rasp version, or to adapt to any other board type you are using (e.g., Arduino).

RGB LED connected to Raspberry Pi Zero

The common node for the RGB LED I'm using it is the anode. I connected it to the 5V pin and the other pins to PWM. If the common node for your RGB LED is the cathode, attach it to GND.

In the below table are the pin associations that I made.

RGB LEDSignalRaspberry Pi Zero
1REDGPIO 12 (PWM)
2GREENGPIO 13 (PWM)
3BLUEGPIO 19 (PWM)
4Anode 2 (5V VCC)

The code.

During a transition I have multiple intermediary colors between the current color and the end color: [currentColor, color1, color2, ... , colorN, targetColor]. I'm using the color HEX codes to represent the colors. It can be replaced very easily with RGB color model if that fits your project better.

The number of intermediary colors is given by the duration of a transition and the number of frames per second (fps). As an example, for a duration of 2s and 30 fps, it will result in a ~60 intermediary colors (steps).

Each step will increment/decrement the current color in order to reach the desired color. This is very similar to a linear function and we can say that this is a linear color transition. 

The first thing is to find the distance from the current color to the new color. The distance is computed for each color from the LED (Red, Green, and Blue). The distance is used to compute the incrementing value at each step. The incrementing value is applied individually per color. This way the LED will make a linear transition to the new color.

In order to control the each LED intensity, I transform the hex color value in percent. This way, it can be easily passed to the duty cycle function. 

The RGB LEDs that I have work with a maximum frequency of 100Hz. 

The code works for both type of RGB LEDs with common node cathode or anode. By default is using the cathode, but you can change this by modifying the constant value COMMON_NODE to 0.

I added a list of colors that demonstrate the behavior, you can change this with the colors that you want.

import RPi.GPIO as GPIO
import time

# use channel numbers on the Broadcom SOC
GPIO.setmode(GPIO.BCM)

# defining the pins
GPIO_RED = 12
GPIO_GREEN = 13
GPIO_BLUE = 19

# defining the pins as output
GPIO.setup(GPIO_RED, GPIO.OUT)
GPIO.setup(GPIO_GREEN, GPIO.OUT)
GPIO.setup(GPIO_BLUE, GPIO.OUT)

# choosing a frequency for pwm
PWM_FREQUENCY = 50

# configure to use PWM
COLOR_PWM = [ 
    GPIO.PWM(GPIO_RED, PWM_FREQUENCY),
    GPIO.PWM(GPIO_GREEN, PWM_FREQUENCY),
    GPIO.PWM(GPIO_BLUE, PWM_FREQUENCY)
]

# For CATHODE this must be 0
# For ANODE this must be 1
COMMON_NODE = 1

def transition(currentColor, targetColor, duration, fps):
    distance = colorDistance(currentColor, targetColor)
    increment = calculateIncrement(distance, fps, duration)

    for i in range(0, int(fps)):
        transitionStep(currentColor, targetColor, increment)
        time.sleep(duration/fps)

def colorDistance(currentColor, targetColor):
    distance = [0, 0, 0]

    for i in range(len(currentColor)):
        distance[i] = abs(currentColor[i] - targetColor[i])

    return distance

def calculateIncrement(distance, fps, duration):
    increment = [0, 0, 0]

    for i in range(len(distance)):
        inc = abs(distance[i] / fps)
        increment[i] = inc
    return increment

def transitionStep(currentColor, targetColor, increment):
    for i in range(len(currentColor)):
        if currentColor[i] > targetColor[i]:
            currentColor[i] -= increment[i]
            if currentColor[i] <= targetColor[i]:
                increment[i] = 0
        else:
            currentColor[i] += increment[i]
            if currentColor[i] >= targetColor[i]:
                increment[i] = 0
    setColor(currentColor)

def setColor(color):
    for i in range(len(COLOR_PWM)):
        percent = hexPercent(color[i])
        if COMMON_NODE:
            percent = 100 - percent
        COLOR_PWM[i].ChangeDutyCycle(percent)

def hexPercent(color):
    percent = (color / float(0xFF)) * 100
    return percent

if __name__ == '__main__':
    try:
        for i in range(len(COLOR_PWM)):
            COLOR_PWM[i].start(1)

        duration = 2.0
        fps = 90.0

        i = 0
        while 1:
            colors = [
                [0xE8, 0x0B, 0x0B], # red
                [0x0b, 0x12, 0xe8], # blue
                [0xe8, 0xd5, 0x0b], # yellow
                [0xdd, 0x0b, 0xe8], # purple
                [0x1A, 0xe8, 0x0B], # green
                [0x0b, 0xd5, 0xe8]  # teal
            ]

            currentColor = colors[i % len(colors)]

            i = (i + 1) % len(colors)
            nextColor = colors[i]

            transition(currentColor, nextColor, duration, fps)

        # close execution by pressing CTRL + C
    except KeyboardInterrupt:
        print("Intrerrupted by user")
        pass
    finally:
        print("Program stopped")
        for colorPwm in COLOR_PWM:
            colorPwm.stop()
        GPIO.cleanup()

Run the code using the following command on the terminal and press CTRL + C to cancel anytime.

python rgb_transition.py

Conclusions

Linear color transition from red to blue

In the above gif, I made the led to make a transition from RED to BLUE. I covered with a square opaque glass in order to highlight the transition.

This project can be expanded to work with a matrix/band of RGB LEDs and to create other types of transition. Feel free to share in the comments what you achieved.

Always aim for the best and never give up.

Posted in raspberrypi, aws

How to send notifications from Raspberry Pi using AWS SNS for FREE

Have you felt the need to be notified by your Rasp when something is happening in your home (and not only)?

Well, I certainly did. I always forgot a lot of things that can be friendly reminded. Like watering the plants, closing the door, leaving the light open, although I don't have a cat or dog, I would definitely have needed to know if they are low on food or water. (and many other situations)

If you want to be notified on your phone, this can get very expensive and complicated. Why spending money on a GSM modem and a prepaid phone card when you can get the same benefits for free using AWS SNS? Luckily, there is another way.

Use the comment section and tell me your thoughts about this. 

AWS prerequisite

If you never had any contact with AWS (Amazon Web Services), you can create an account for free at aws.amazon.com

But, what is AWS SNS? AWS Simple Notification Service (SNS) makes easier to send notifications to your phone or email address. It allows you to send 100 SMS and 100.000 emails to be sent each month for free (at the moment of writing this). I proved that it’s easy to work with AWS on how to publish temperature and humidity to CloudWatch. You can compare this service with the Observer design pattern.

The first thing that we need it's a way to give permissions to our Rasp to publish SNS topics. In the post mentioned above, I showed how to give permission to the CloudWatch service. The process is very similar, but we need to give access to AWS SNS instead. Please give the following permission policy to your IAM user, AmazonSNSFullAccess. Don't forget to save the AWS access key id and secret access key in a safe place.

After we have the access keys we need to go to AWS SNS Console and follow this tutorial created by AWS to create a topic. After you successfully created the topic, save the topic ARN, we will need it later.

I said that we will send the notifications to our phones and emails, we need to add them as subscribers to the topic that we just created. Use the guidance provided by the AWS to register them. For email addresses don't forget that you have to confirm your registrations, strangely, for a phone, you don't have to confirm that you want to receive notifications.

What do I need to get started?

It's very easy to integrate the AWS SNS functionality in your project. In order to prove this, I will use a project that detects if the door is open or closed. I will modify this project to send notifications with the door status (open/close).

To give you a little context about the project, it uses an ultrasonic sensor to read the distance from the nearest object. When the door is opened/closed the distance increase/decrease as well. To determine if the door status it uses an if/else code block using a fixed distance to compare against. At a first glance this doesn't seem a problem, but if the door is opened, the program will output:

Door Open # 1 notification
Door Open # 2 notifications Door Open # 3 notifications
...
Door Open # n notifications

Every time this will run, it will send a notification. Because the number of notifications from the free tier is limited we need to make some changes in order to send only 1 message. Even if the money wasn't a problem, it would be very annoying and not cost effective to send so many alerts in a short span of time.

How can I send the messages only once?

After searching a little, the solution that I stumbled upon is to observe the percentage change between 2 consecutive distances (also known as Delta).

\text{Percentage change} = \frac{\Delta V}{V_1} = \frac{V_2 - V_1}{V_1} \times100 .

We put the sensor at 4cm from the door, with the door closed the output is ~4cm. When the door opens, the distance will significantly increase. The reverse applies equally when the door closes, the distance significantly decrease. In this case, significantly means that the distance increase/decrease by more than 100% (you can increase or decrease this value as you like).

Let's suppose for simplicity that we read 4cm and for the next point we read 20cm. You can notice even if from 4cm to 20cm it's not much it will give a huge output (20cm - 4cm)/4cm * 100 = 400%. Because 400 breaches our threshold of 100 we can say that the door is Open or Close.

Note: If the door opens (or closes) very slowly this algorithm won't catch the status change, before implementing this make sure that you don't have any ninja or cats in the house in order to be effective.

Take me to the code

In the below snipped I focused mainly on the AWS part of the code, the part that is focusing on the ultrasonic sensor can be found here

For Python 2 (if you use Python 3 please search for the equivalent commands), in order to be able to run the code, you will need to install the AWS SDK in Python, who is called boto3.

pip install boto3

If you don't have pip you can install it using the following command

sudo apt-get install python-pip

Let's do the most important things first so that we don't forget, replace INSER_AWS_ACCESS_KEY,  INSER_AWS_SECRET_ACCESS, and INSER_TOPIC_ARN with your own.

In order to publish the message to our listeners, we need to use the publish method from the SNS client API. This method has the following parameters that we will use:

  • TopicArn - this is the topic identification (ARN) 
  • Message - the message that will be sent in the email/SMS, be careful with the text size because an SMS allows only 153 characters
  • Subject - (optional) this is used only in the emails (an SMS doesn't have a subject)

For a comprehensive documentation about SNS you can check the following page, you can find details to build more sophisticated systems using this service.

import RPi.GPIO as GPIO
import time
import boto3

AWS_ACCESS_KEY = 'INSERT_AWS_ACCESS_KEY'
AWS_SECRET_ACCESS = 'INSERT_AWS_SECRET_ACCESS'
TOPIC_ARN = 'INSERT_TOPIC_ARN' # should similar with this arn:aws:sns:us-east-1:1234567890:door-status

# ....
# missing functions/constants can be found here 
# https://thehumblecode.com/blog/how-to-detect-your-door-activity-using-raspberry-pi-and-hc-sr04/
# ....

def publish(snsClient, message):
# replace the TOPIC_ARN with your own
response = snsClient.publish( TopicArn=TOPIC_ARN,
Message=message, Subject='Door status' ) print ("Published with id %s " % response) def percentChange(current, previous, difference): increase = current - previous increasePercentage = increase / previous * 100 if increasePercentage > difference: return True return False if __name__ == '__main__': try: snsClient = boto3.client( 'sns', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS, region_name='us-east-1' ) currentDistance=distance() previousDistance=currentDistance while 1: currentDistance = distance() print ("Distance = %.1f cm" % currentDistance) if percentChange(currentDistance, previousDistance, 100): message = "Door opened. Distance greatly increased from %.1f cm to %.1f" % (previousDistance, currentDistance) print (message) publish(snsClient, message) elif percentChange(previousDistance, currentDistance, 100): message = "Door closed. Distance greatly decreased from %.1f cm to %.1f" % (previousDistance, currentDistance) print (message)
publish(snsClient, message) previousDistance = currentDistance time.sleep(1) # close execution by pressing CTRL + C except KeyboardInterrupt: print("Intrerrupted by user") pass finally: print("Program stopped") GPIO.cleanup()

Run the code using the following command on the terminal and press CTRL + C to cancel anytime.

python distance.py

terminal.jpg

Final thoughts

I hope that you find this useful, AWS SNS is a tool that can come in handy in some situations and has a lot of potential for future projects. 

If you encounter any trouble or have any curiosity, feel free to use the comment section to share your feelings about this.

But most importantly, don't stop learning!