Cortex-M: share an int between two tasks

If a structure is only directly accessible (both read and write) to the "owner", some form of message transfer may be needed to update and distribute the structure.

All those tasks wanting to update some field(s) of a structure must send update request to the owner.

If some other task wants to get an atomic snapshot of the structure, it sends a read request to the owner and receives in return the current snapshot of the struct.

The problem is, how large messages can be transferred between tasks. If the size is unlimited, no problems delivering the snapshot of any size. However, if the maximum size is only a single word (16 or 32 bits) or only a few words, things get nasty. In this case the client must allocate a buffer for the snapshot and pass it in the read request, the struct owner will copy data to the requester local buffer.

Reply to
upsidedown
Loading thread data ...

This is a stop forever!

You use a flag to indicate that you should be stopping, and a semaphore to handle the wakeup (semaphores are intended for one task to give them, and another to take them).

Stop messing around with varying delays - that is an irrelevancy, and is complicating your code. Either your sensor task is running with regular samples, or it is stopped.

One possible arrangement could be:

static volatile bool sensor_stopped; static SemaphoreHandle_t sensor_semaphore;

// TaskA static const TickType_t delay_ticks = pdMS_TO_TICKS(10);

while (true) { // Get sample and process if (sensor_stopped) { // Wait for semaphore to be ready xSemaphoreTake(sensor_semaphore, portMAX_DELAY); // Then release it again xSemaphoreGive(sensor_semaphore); } vTaskDelay(delay_ticks); // DelayUntil }

// TaskB static void turn_off_sensor(void) { sensor_stopped = True; xSemaphoreTake(sensor_semaphore, 0); }

static void turn_on_sensor(void) { sensor_stopped = False; xSemaphoreGive(sensor_semaphore); }

The "sensor_stopped" flag is really just an optimisation, so that TaskA doesn't have to keep checking the semaphore. It can be dropped if you want.

A really simple solution would be:

static volatile bool sensor_stopped;

// TaskA static const TickType_t delay_ticks = pdMS_TO_TICKS(10);

while (true) { if (!sensor_stopped) { // Get sample and process } vTaskDelay(delay_ticks); // DelayUntil }

// TaskB static void turn_off_sensor(void) { sensor_stopped = True; }

static void turn_on_sensor(void) { sensor_stopped = False; }

This all depends on why you want to stop the sensor task - if you are going into low power modes, for example, then it may be best to block the task. If you don't mind a regular task doing nothing useful, then the second choice is fine.

Reply to
David Brown

Why don't you simply make each sampling task like

loop wait_for_event() do_the_sampling end loop

Then have a separate timing task, which runs at say every millisecond and increment a single integer counter.

For each task, calculate (Counter modulo NN) and send a message to a specific sampler task, when modulo is zero. NN is the number of clock ticks before a specific sampling is done.

If you want to do sampling say every 10 ms, but want to spread out the sampling times, add a constant 1, 2, 3 to a copy of the counter before performing the module and one task will sample at xx7, the other at xx8 and third at xx9 milliseconds every 10 ms. This way, you can control the sampling times and periods from a single point in the timing task and no need to spread out this to individual sampler tasks.

Reply to
upsidedown

In Cortex-M4 there are exclusive access instruction pairs LDREX / STREX, LDREXH / STREXH and LDREXB / STREXB which can be used to effect mutual exclusion on a variable access.

They are actually pairs where the store part refuses to work if there has been an exception between the load and store part of the pair. There is a result indication of the success.

If the thread switching is done with the recommended PendSV exception, the exclusion works, if applied suitably.

--
-TV 


On 18.3.20 22:24, David Brown wrote: 
 Click to see the full signature
Reply to
Tauno Voipio

These instructions can be useful, but difficult to use. Generally they are part of how you implement mutexes, semaphores, and other synchronisation mechanisms - they are not sufficient on their own. It's really easy to follow a "this is how you make a mutex with ldrex/strex" recipe and get something that will deadlock as soon as there is contention, because the recipe only works when you have time-sharing or multiple cores.

And on single-core microcontroller-oriented cpus, like the Cortex-M, disabling global interrupts for a few instructions is generally more efficient and much easier to get right.

Reply to
David Brown

Been there - done that.

I have two parallel versions of the same kernel, one with interrupt disable and the other with LDREX / STREX.

Both ways work, and I agree that the global interrupt disable is easier to use, but it may create unacceptable delays on high-priority interrupts.

Would you provide an example where the LDREX / STREX will create a deadlock in a single processor? Remember that any interrupt service will cancel the exclusivity, as also a thread switch.

--

-TV
Reply to
Tauno Voipio

I haven't implemented any real kernels, so I'm happy to learn from your experience there.

I think people get a bit obsessed with interrupt disable times. Many programmers get in a fluster when you write code that disables interrupts for 4 or 5 instructions - yet have no problem writing an interrupt routine than takes 50 to 200 instruction clocks to execute, with that amount of unknown jitter. (Yes, I am ignoring interrupt priorities for simplification.)

If you need sub-microsecond reaction times for something, use hardware - not software. That's why your microcontroller has timers, communication ports, DMA, and the like. And that means you can disable interrupts for a microsecond without causing trouble - that's a /long/ time on a modern Cortex-M.

And when you go to a faster device like NXP's Cortex-M7 "crossover" chips at 600 MHz, a disable interrupt based function for atomic 64-bit access, or a CAS operation, all running from tightly-coupled memory will run faster than a single "load" operation if the instruction and data are not in the cache. And that load operation will delay interrupts just as much as the interrupt-disable block.

Even on a M4, a divide instruction can take 12 cycles - you can have your interrupts disabled, your load/stores done, and interrupts re-enabled in that time.

Yes, disabling interrupts may cause unacceptable delays - but you should look closely at what delays are acceptable and unacceptable. Don't dismiss interrupt disable blocks on principle without actually running the numbers.

It's the same as any other way of implementing locks that does not take into account priority inversion. Suppose Task A has higher priority than Task B. (Task A could be an interrupt - the details don't matter too much.)

Task A: 1. Take lock. 2. Use shared resource. 3. Release lock.

Task B: 1. Take lock. 2. Use shared resource. 3. Release lock.

If task B has passed step 1 when task A is scheduled (such as the interrupt being triggered), task A will block on step 1, and task B never gets to step 3.

It is not that LDREX/STREX as special here - you'd get the same problem using interrupt-disable blocks (if you are using them to access other locks, rather than using the interrupt enable as the lock itself). The problem is that the net is full of examples like:

"""

8.5.3. Example of LDREX and STREX usage

The following is an example of typical usage. Suppose you are trying to claim a lock:

Lock address : LockAddr

Lock free : 0x00

Lock taken : 0xFF

MOV R1, #0xFF ; load the ?lock taken? value

try LDREX R0, [LockAddr] ; load the lock value

CMP R0, #0 ; is the lock free?

STREXEQ R1, R0, [LockAddr]; try and claim the lock

CMPEQ R0, #0 ; did this succeed?

BNE try ; no - try again. . .

; yes - we have the lock """

(That's from ARM's documentation.)

It is very seductive to think that LDREX/STREX lets you implement locks, and these recipes are good enough. And when you use them, and test them, everything seems to work. But I'm sure I don't need to tell you that this is not sufficient - it will only work if both Task A and Task B can be running.

Reply to
David Brown

Il 19/03/2020 12:12, David Brown ha scritto: ...

Yes, but in my example I used vTaskDelayAbort(), so it isn't really a stop forever.

Sincerely taskB should change the sampling interval too. Anyway the big issue is stopping and restarting, not changing the interval.

I'm not sure about xSemaphoreTake(sensor_semaphore, 0) in turn_off_sensor(). You pass a zero timeout. Are you sure you will be able to take the semaphore in *every* situation? What happens if turn_off_sensor() is run immediately after xSemaphoreTake(..., portMAX_DELAY) returns? In this case, I think taskB wouldn't be able to take the semaphore and so taskA doesn't really block. You will have sensor_stopped true, but the semaphore released.

Yes, this is another solution that doesn't really stop the task, but only the sampling process.

Reply to
pozz

Small digression: ( out of ignorance of the M3 ) - There is nothing you have to do with an MMU or other caching furniture to guarantee serialization of access? So "volatile" is known to be sufficient?

I'd be tempted to put a mutex on an measure the cost because race conditions are quite ... challenging to test for.

--
Les Cargill
Reply to
Les Cargill

Yes, volatile is sufficient here. Almost all processors have a serial processing model - that is, no matter how much super-scaling and out-of-order execution you have (the M3 does not have either, but the M7 has super-scaling and can often do two instructions per cycle), the result is as though instructions execute fully in program order. Writes can end up re-ordered before they hit memory, due to caches or write buffers, but this is invisible to the code.

You need data ordering instructions (like "dsb" or "dmb" on the Cortex-M) or MPU memory regions (such as non-cacheable areas) when the data is accessed by something other than the processor that reads and writes it. So if you are writing data to a buffer that will be read by DMA, or you have a dual processor system (as found in some Cortex-M devices), or an SMP system (like Cortex-A devices) it's a different matter - you need data synchronisation.

Also remember that volatile accesses do not synchronise with non-volatile accesses. You can't do some non-volatile writes to a buffer, then set a volatile flag and think the writes will always come before the flag.

You avoid race conditions by design, not testing!

Reply to
David Brown

The key point is that things are different for a single-processor/single-master system, then when you have multiple processors/bus masters. A single processor system that didn't keep accesses in order at least to its own view would be seriously broken. At worse there could be a few cycles of delay slots that the compiler would keep track of that needed to be observed between a write and a read.

The one catch with a single processor system is if it has something like DMA, then the memory accessed by the DMA needs the protections like a multiprocessor system.

The volatile system was invented back at a time when for most machines, it was sufficient for most types of access.

Reply to
Richard Damon

It's also worth noting that even though "volatile" is not sufficient for bigger systems (multiple bus masters, like multiple cores or DMA), it is generally still /necessary/ along with the memory barriers, cache flushes, synchronisation instructions, locks, etc.

Sometimes with modern C or C++ you can use atomic types rather than volatile, since the atomic types are effectively volatile in themselves (as well as having other properties and synchronisation features).

And of course compiler-specific features can replace the need of volatile (such as using a gcc asm memory clobber).

Reply to
David Brown

Il 19/03/2020 15:31, pozz ha scritto:

David I hope you can answer to this. This topic is quite confusing to me. Thank you.

Reply to
pozz

I will admit that it isn't my area of expertise, as most of my programming is done on machines where volatile is sufficient for most of the needs, but I remember arguments that for the big machines volatile wasn't needed at all, as you needed a barrier that said every thing that happened before this will be done before anything that happens after this, and with such a barrier volatile isn't needed.

It may well be, and I think it is, that the Standards adopted weaker barriers (that are perhaps significantly cheaper to perform) and with those weaker barriers we still have a need for volatile.

The one key thing that I remember is that the compiler needs to understand these barriers, as it needs to avoid moving accesses accross them.

Reply to
Richard Damon

A Take with a zero timeout will lower the semaphore if it happens to be raised, and continue if is was lowered. This will cause task A to go and block on the Semaphore until task B gives it again. The take operation returns a flag as to if it succeeded, and Task a should really use a: if(xSemaphoreTake(sensor_semaphore, portMAX_DELAY)) { xSemaphoreGive(sensor_semaphore); }

Otherwise if something causes the Take to abort, the the sampling will start back up.

I would probably do the take before setting the flag, so if TaskA runs between the statements it doesn't do its take, and pass.

Reply to
Richard Damon

If portMAX_DELAY is used in xSemaphoreTake(), I think the function returns only if it was able to take the semaphore (if we don't use functions that abruptly let one task out of blocked state).

Anyway my objection is still there. If turn_off_sensor() is executed immediately after xSemaphoreTake(sensor_semaphore, portMAX_DELAY) returns, taskB wouldn't be able to take the semaphore, so taskA will wrongly continue doing its work.

I think we can use portMAX_DELAY in turn_off_sensor() too. In the worst case, taskB would wait for a very short time to take the semaphore, but usually it takes the semaphore immediately.

Reply to
pozz

Yes - if you have such a barrier, volatile is generally not needed (that was one of the exceptions I gave). But the barrier needs to be defined correctly.

If you write: uint64_t data; bool ready_flag;

data = 123; asm ("dmb"); ready_flag = true;

then the "dmb" instruction will form a memory barrier - all previously encountered memory operations are completed before the "dmb" happens, and none of the following memory operations are started. But there is nothing stopping the compiler re-arranging the code like:

ready_flag = true; asm ("dmb"); data = 123;

Instructions like "dmb" force an order on the cpu operations, not the compiler - while "volatile" enforces a partial order on the compiler, but not the hardware.

The standard solution would be asm("dmb" ::: "memory"), where the memory clobber forces an ordering on the compiler - and thus you can (usually) omit the "volatile".

The C11 and C++11 atomics have a number of barriers - some stronger, some weaker in various senses. Prior to C11 and C++11, there were no standard barriers at all.

Yes.

Reply to
David Brown

Yes, I think the code is not quite right. I am not happy about it anyway - it doesn't feel good. But as I said before, I think you would be better with a re-think of the design at a higher level.

That depends on the timing requirements, but could work.

Reply to
David Brown

Yes, if TaskB has priority greater than or equal to TaskA, then it could come in between the Take and the Give. One solution would be to make the Give back conditional on not stopped, as well as the getting and processing of the sample. Something like:

while (true) { if(sensor_stopped) { xSemaphoreTake(sensor_semaphore, portMAX_DELAY); if(!sensor_stopped) { xSemaphoreGive(sensor_semaphore); } } else { // Get sample and process vTaskDelay(delay_ticks; } }

With this, if TaskB just sets the stopped flag, TaskA will take the semaphore, not give it, then loop around and take it again and wait. If TaskB does a 0 timeout take, it can prevent that extra loop.

If TaskA gets accidentally woken up, then it still doesn't give the semaphore and blocks on the next loop.

Reply to
Richard Damon

David, if you have time, many of us would appreciate a quick summary of the situations where the different barriers should be used.

"dmb" can be a full read/write barrier, or just a write barrier. When to use each?

"dsb" is different from "dmb" because it limits instruction ordering, but when is that useful? When to use it as a write-only barrier?

"isb" is intended to precede a context switch, i.e. task switching in an RTOS. Is it sufficient for that, and is that the only time to use it?

If you know a good article that gives practical guidelines, post a link, otherwise I'd really like to hear your thoughts.

Clifford Heath.

Reply to
Clifford Heath

ElectronDepot website is not affiliated with any of the manufacturers or service providers discussed here. All logos and trade names are the property of their respective owners.